利用go语言的并发能力并发生成测试数据写入es

ES生成测试数据并写入阿里云ES

package main

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"log"
	"math"
	"math/rand"
	"net/http" // 重新添加 net/http 包
	"os"
	"path/filepath"
	"strconv"
	"sync"
	"time"

	"github.com/brianvoe/gofakeit/v6"
	"github.com/olivere/elastic/v7"
)

// 定义数据结构
type Address struct {
	Street  string `json:"street"`
	City    string `json:"city"`
	Zipcode string `json:"zipcode"`
}

type Record struct {
	UserID     string  `json:"user_id"`
	Name       string  `json:"name"`
	Email      string  `json:"email"`
	Age        int     `json:"age"`
	SignupDate string  `json:"signup_date"`
	Address    Address `json:"address"`
}

// 配置常量
const (
	TargetSizeGB          = 10                  // 目标数据大小(GB)
	OutputDir             = "output_data"       // 输出文件目录
	Workers               = 8                   // 数据生成器数量
	BufferChannelSize     = 10000               // 通道缓冲区大小
	MaxFileSizeBytes      = 500 * 1024 * 1024   // 每个文件最大500MB
	AvgRecordSizeBytes    = 500                 // 估计每条记录500字节
	ProgressPrintMB       = 100                 // 每100MB打印一次进度
	ProgressGenInterval   = 100000              // 每10万条记录生成器打印一次进度
	BulkSize              = 500                 // 每个 Bulk 请求的记录数,减半以提高成功率
	WorkerCount           = 8                   // 并发的工作线程数
	MaxRetries            = 3                   // 最大重试次数
	InitialBackoff        = 2 * time.Second     // 初始退避时间
	ElasticsearchURL      = "http://es-cn-jtexxxxx54r31.public.elasticsearch.aliyuncs.com:9200" // 使用 HTTP 和 9200 端口
	ElasticsearchUsername = "elastic"           // 替换为您的用户名
	ElasticsearchPassword = "xxxxxx56789"      // 替换为您的密码
	IndexName             = "es_index"          // 替换为您的索引名称
)

func init() {
	gofakeit.Seed(0)
	rand.Seed(time.Now().UnixNano())
}

// 生成一条记录
func generateRecord() Record {
	return Record{
		UserID:     gofakeit.UUID(),
		Name:       gofakeit.Name(),
		Email:      gofakeit.Email(),
		Age:        rand.Intn(63) + 18, // 18-80岁
		SignupDate: gofakeit.Date().Format("2006-01-02"),
		Address: Address{
			Street:  gofakeit.Street(),
			City:    gofakeit.City(),
			Zipcode: gofakeit.Zip(),
		},
	}
}

// 创建 Elasticsearch 客户端(使用 HTTP 和 9200 端口)
func createESClient() (*elastic.Client, error) {
	// 创建默认的 HTTP 客户端,增加超时时间至60秒
	httpClient := &http.Client{
		Timeout: 60 * time.Second, // 增加超时时间至60秒
	}

	// 创建 Elasticsearch 客户端
	client, err := elastic.NewClient(
		elastic.SetURL(ElasticsearchURL),
		elastic.SetBasicAuth(ElasticsearchUsername, ElasticsearchPassword),
		elastic.SetSniff(false), // 在云服务中通常禁用 sniff
		elastic.SetHttpClient(httpClient), // 使用自定义的 http.Client
	)

	if err != nil {
		return nil, fmt.Errorf("创建 Elasticsearch 客户端失败: %v", err)
	}

	// 检查连接
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	_, _, err = client.Ping(ElasticsearchURL).Do(ctx)
	if err != nil {
		return nil, fmt.Errorf("无法连接 Elasticsearch: %v", err)
	}

	fmt.Println("Elasticsearch 客户端连接成功")
	return client, nil
}

// 获取所有 JSON 文件路径
func getJSONFiles(dir string) ([]string, error) {
	var files []string
	err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if !info.IsDir() && filepath.Ext(path) == ".json" {
			files = append(files, path)
		}
		return nil
	})
	return files, err
}

// 读取单个文件中的所有记录
func readRecords(filePath string, recordsChan chan<- string) error {
	file, err := os.Open(filePath)
	if err != nil {
		return err
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)
	count := 0
	for scanner.Scan() {
		recordsChan <- scanner.Text()
		count++
		if count%100000 == 0 {
			log.Printf("读取文件 %s 的记录数: %d", filePath, count)
		}
	}

	if err := scanner.Err(); err != nil {
		return err
	}
	log.Printf("完成读取文件 %s,共读取 %d 条记录", filePath, count)
	return nil
}

// 执行 Bulk 请求并处理响应
func executeBulk(client *elastic.Client, bulkRequest *elastic.BulkService) error {
	// 使用带有超时的上下文
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) // 设置上下文超时时间
	defer cancel()

	var err error
	backoff := InitialBackoff

	for attempt := 1; attempt <= MaxRetries; attempt++ {
		bulkResponse, err := bulkRequest.Do(ctx)
		if err != nil {
			log.Printf("Bulk 请求第 %d 次失败: %v", attempt, err)
			time.Sleep(backoff)
			backoff *= 2
			continue
		}

		if bulkResponse.Errors {
			hasError := false
			for _, item := range bulkResponse.Items {
				for action, result := range item {
					if result.Error != nil {
						hasError = true
						log.Printf("Bulk 操作失败: %s %s: %v", action, result.Id, result.Error)
					}
				}
			}
			if hasError {
				err = fmt.Errorf("Bulk 请求包含错误")
				log.Printf("Bulk 请求第 %d 次包含错误", attempt)
				time.Sleep(backoff)
				backoff *= 2
				continue
			}
		}

		// 成功执行
		return nil
	}

	return fmt.Errorf("Bulk 请求失败,已达到最大重试次数: %v", err)
}

// 写入器负责将记录写入文件(数据生成部分)
func writer(wg *sync.WaitGroup, recordsChan <-chan Record, filePathChan <-chan string) {
	defer wg.Done()

	var file *os.File
	var writerBuf *bufio.Writer
	var currentSize int64 = 0
	var currentFilePath string
	var nextProgress int64 = ProgressPrintMB * 1024 * 1024

	for record := range recordsChan {
		// 检查是否需要切换文件
		if currentFilePath == "" || currentSize >= MaxFileSizeBytes {
			// 获取新的文件路径
			fp, ok := <-filePathChan
			if !ok {
				log.Println("文件路径通道已关闭,无法获取新的文件路径")
				break
			}

			// 关闭旧文件
			if writerBuf != nil {
				writerBuf.Flush()
			}
			if file != nil {
				file.Close()
			}

			// 打开新文件
			var err error
			file, err = os.Create(fp)
			if err != nil {
				log.Printf("创建文件 %s 失败: %v\n", fp, err)
				continue
			}
			writerBuf = bufio.NewWriter(file)
			currentFilePath = fp
			currentSize = 0
			nextProgress = ProgressPrintMB * 1024 * 1024
			log.Printf("开始写入文件: %s", fp)
		}

		// 序列化记录
		recordBytes, err := json.Marshal(record)
		if err != nil {
			log.Printf("序列化记录失败: %v\n", err)
			continue
		}
		recordStr := string(recordBytes) + "\n"

		// 写入文件
		_, err = writerBuf.WriteString(recordStr)
		if err != nil {
			log.Printf("写入文件失败: %v\n", err)
			continue
		}
		currentSize += int64(len(recordStr))

		// 打印进度
		if currentSize >= nextProgress {
			log.Printf("已写入 %s 大小: %.2f MB", currentFilePath, float64(currentSize)/(1024*1024))
			nextProgress += ProgressPrintMB * 1024 * 1024
		}
	}

	// 写入剩余的数据
	if writerBuf != nil {
		writerBuf.Flush()
	}
	if file != nil {
		file.Close()
	}
	log.Println("写入器完成所有文件的写入")
}

// 数据生成函数
func generateData() {
	start := time.Now()
	log.Println("开始生成数据...")

	// 创建输出目录
	if _, err := os.Stat(OutputDir); os.IsNotExist(err) {
		err := os.Mkdir(OutputDir, os.ModePerm)
		if err != nil {
			log.Fatalf("创建输出目录失败: %v", err)
		}
		log.Printf("创建输出目录: %s", OutputDir)
	} else {
		log.Printf("输出目录已存在: %s", OutputDir)
	}

	// 计算总记录数
	totalBytes := int64(TargetSizeGB) * 1024 * 1024 * 1024
	totalRecords := totalBytes / AvgRecordSizeBytes
	recordsPerWorker := totalRecords / int64(Workers)
	remainder := totalRecords % int64(Workers)

	// 计算每个文件能容纳的记录数
	recordsPerFile := MaxFileSizeBytes / AvgRecordSizeBytes
	totalFiles := int(math.Ceil(float64(totalRecords) / float64(recordsPerFile)))

	log.Printf("总记录数: %d", totalRecords)
	log.Printf("每个文件记录数: %d", recordsPerFile)
	log.Printf("总文件数: %d", totalFiles)

	// 创建通道
	recordsChan := make(chan Record, BufferChannelSize)
	filePathChan := make(chan string, totalFiles) // 足够的缓冲区存放所有文件路径
	var wg sync.WaitGroup

	// 启动写入器
	wg.Add(1)
	go writer(&wg, recordsChan, filePathChan)

	// 生成所有文件路径
	go func() {
		for i := 1; i <= totalFiles; i++ {
			filePath := filepath.Join(OutputDir, "test_data_part_"+strconv.Itoa(i)+".json")
			filePathChan <- filePath
		}
		close(filePathChan) // 所有文件路径发送完毕后关闭通道
		log.Println("所有文件路径已生成并发送到通道")
	}()

	// 启动生成器
	var genWg sync.WaitGroup
	for i := 0; i < Workers; i++ {
		genWg.Add(1)
		go func(workerID int, numRecords int64) {
			defer genWg.Done()
			for j := int64(0); j < numRecords; j++ {
				record := generateRecord()
				recordsChan <- record
				if (j+1)%ProgressGenInterval == 0 {
					log.Printf("生成器 %d 已生成 %d 条记录", workerID, j+1)
				}
			}
			log.Printf("生成器 %d 完成生成 %d 条记录", workerID, numRecords)
		}(i+1, recordsPerWorker)
	}

	// 处理余数
	if remainder > 0 {
		genWg.Add(1)
		go func() {
			defer genWg.Done()
			for j := int64(0); j < remainder; j++ {
				record := generateRecord()
				recordsChan <- record
			}
			log.Printf("生成器 %d 完成生成余数 %d 条记录", Workers+1, remainder)
		}()
	}

	// 等待生成器完成
	genWg.Wait()
	close(recordsChan)
	log.Println("所有生成器已完成,关闭 recordsChan")

	// 等待写入器完成
	wg.Wait()

	elapsed := time.Since(start)
	log.Printf("数据生成完成,总耗时: %s", elapsed)
}

// 导入 Elasticsearch 的函数
func importToES() {
	start := time.Now()
	log.Println("开始导入数据到 Elasticsearch...")

	client, err := createESClient()
	if err != nil {
		log.Fatalf("无法连接 Elasticsearch: %v", err)
	}

	// 获取所有 JSON 文件
	files, err := getJSONFiles(OutputDir)
	if err != nil {
		log.Fatalf("无法读取 JSON 文件: %v", err)
	}

	if len(files) == 0 {
		log.Fatalf("没有找到任何 JSON 文件在目录: %s", OutputDir)
	}

	log.Printf("找到 %d 个 JSON 文件用于导入", len(files))

	recordsChan := make(chan string, 10000)
	var wg sync.WaitGroup

	// 启动读取文件的 goroutine
	go func() {
		for _, file := range files {
			log.Printf("开始读取文件: %s", file)
			if err := readRecords(file, recordsChan); err != nil {
				log.Printf("读取文件 %s 失败: %v", file, err)
			}
			log.Printf("完成读取文件: %s", file)
		}
		close(recordsChan)
		log.Println("所有文件读取完成,关闭 recordsChan")
	}()

	// 启动 Worker
	for i := 0; i < WorkerCount; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			var bulkRequest *elastic.BulkService
			bulkRequest = client.Bulk()
			count := 0
			totalImported := 0

			for record := range recordsChan {
				var doc map[string]interface{}
				if err := json.Unmarshal([]byte(record), &doc); err != nil {
					log.Printf("Worker %d 解析 JSON 失败: %v", workerID, err)
					continue
				}

				// 确保 user_id 字段存在且为 string 类型
				userID, ok := doc["user_id"].(string)
				if !ok {
					log.Printf("Worker %d 无法获取有效的 user_id", workerID)
					continue
				}

				req := elastic.NewBulkIndexRequest().Index(IndexName).Id(userID).Doc(doc)
				bulkRequest = bulkRequest.Add(req)
				count++

				if count >= BulkSize {
					// 执行 Bulk 请求
					if err := executeBulk(client, bulkRequest); err != nil {
						log.Printf("Worker %d 执行 Bulk 请求失败: %v", workerID, err)
					} else {
						totalImported += count
						log.Printf("Worker %d 成功导入 %d 条记录,累计导入 %d 条", workerID, count, totalImported)
					}
					// 重置 Bulk 请求
					bulkRequest = client.Bulk()
					count = 0
				}

				if totalImported%100000 == 0 && totalImported > 0 {
					log.Printf("Worker %d 已累计导入 %d 条记录", workerID, totalImported)
				}
			}

			// 执行剩余的 Bulk 请求
			if count > 0 {
				if err := executeBulk(client, bulkRequest); err != nil {
					log.Printf("Worker %d 执行 Bulk 请求失败: %v", workerID, err)
				} else {
					totalImported += count
					log.Printf("Worker %d 成功导入剩余的 %d 条记录,累计导入 %d 条", workerID, count, totalImported)
				}
			}

			log.Printf("Worker %d 完成数据导入,总导入 %d 条记录", workerID, totalImported)
		}(i + 1)
	}

	// 等待所有 Worker 完成
	wg.Wait()

	elapsed := time.Since(start)
	log.Printf("数据导入完成,总耗时: %s", elapsed)
	log.Println("所有数据已成功导入 Elasticsearch")
}

func main() {
	// 配置日志输出格式
	log.SetFlags(log.LstdFlags | log.Lshortfile)

	// 根据功能分离,先生成数据,再导入ES
	generateData()
	importToES()
}

主要更新和优化

  1. 增加详细的日志输出:

    • 在数据读取、写入和导入的各个关键步骤添加了日志输出,帮助跟踪进度和排查问题。
    • 使用 log.Printf 记录关键事件,如开始和完成读取文件、生成器生成记录、写入文件以及每个 Worker 导入的进度。
  2. 调整 Bulk 请求的超时时间:

    • executeBulk 函数中的上下文超时时间从 60 秒增加到 120 秒,以防止由于大量数据导致的请求超时。
  3. 校验和报错加强:

    • readRecords 函数中记录每个文件读取的进度和完成情况。
    • importToES 的 Worker 中记录每次 Bulk 请求的成功和失败情况,以及累计导入的记录数。
  4. 优化并发和通道处理:

    • 确保 filePathChan 在所有文件路径发送完毕后关闭,以防止写入器阻塞。
    • 在生成数据的过程中,确保所有生成器完成后才关闭 recordsChan
  5. 日志格式配置:

    • main 函数中配置日志输出格式,以包括时间戳和文件行号,便于调试。

运行和监控

运行此程序时,您将看到类似以下的日志输出:

2024/04/27 10:00:00 main.go:123: 开始生成数据...
2024/04/27 10:00:00 main.go:133: 创建输出目录: output_data
2024/04/27 10:00:00 main.go:142: 总记录数: 21474836480
2024/04/27 10:00:00 main.go:143: 每个文件记录数: 1048576
2024/04/27 10:00:00 main.go:144: 总文件数: 20480
2024/04/27 10:00:00 main.go:156: 开始写入文件: output_data/test_data_part_1.json
2024/04/27 10:00:05 main.go:172: 生成器 1 已生成 100000 条记录
...
2024/04/27 10:02:00 main.go:198: 数据生成完成,总耗时: 2m0s
2024/04/27 10:02:00 main.go:201: 开始导入数据到 Elasticsearch...
2024/04/27 10:02:00 main.go:61: Elasticsearch 客户端连接成功
2024/04/27 10:02:00 main.go:217: 找到 20480 个 JSON 文件用于导入
2024/04/27 10:02:00 main.go:227: 开始读取文件: output_data/test_data_part_1.json
2024/04/27 10:02:30 main.go:237: 完成读取文件: output_data/test_data_part_1.json,共读取 1000000 条记录
...
2024/04/27 10:05:00 main.go:259: Worker 1 成功导入 500 条记录,累计导入 500 条
...
2024/04/27 10:10:00 main.go:291: Worker 1 完成数据导入,总导入 500000 条记录
...
2024/04/27 10:20:00 main.go:303: 数据导入完成,总耗时: 18m20s
2024/04/27 10:20:00 main.go:304: 所有数据已成功导入 Elasticsearch

通过这些日志,您可以:

  • 跟踪数据生成过程: 了解每个生成器的进度和完成情况。
  • 监控文件写入过程: 确认每个文件的写入进度和完成状态。
  • 监测数据导入过程: 追踪每个 Worker 导入的记录数,以及每次 Bulk 请求的处理情况。

进一步优化建议

  1. 调整 Bulk 请求大小和 Worker 数量:

    • 根据 Elasticsearch 集群的性能,您可以尝试增大或减小 BulkSize,以及调整 WorkerCount 以优化导入速度。
    • 例如,将 BulkSize 增加到 1000 或 2000,观察其对导入性能的影响。
  2. 监控 Elasticsearch 性能:

    • 在导入过程中,监控 Elasticsearch 集群的 CPU、内存、磁盘 I/O 和网络状况,确保集群能够处理高负载。
    • 使用 Elasticsearch 提供的监控工具,比如 Kibana 的监控插件,来实时查看集群的健康状态和性能指标。
  3. 处理导入过程中可能出现的错误:

    • 如果遇到频繁的 Bulk 请求错误,可以根据错误日志分析原因,比如索引模板不匹配、映射冲突等,并进行相应调整。
  4. 优化索引设置:

    • 临时关闭刷新间隔: 在大规模导入时,可以暂时关闭自动刷新,以提高导入性能。导入完成后,再手动触发刷新。
      // 在导入前临时调整索引设置
      _, err := client.IndexPutSettings(IndexName).BodyJson(map[string]interface{}{
          "index": map[string]interface{}{
              "refresh_interval": "-1",
          },
      }).Do(context.Background())
      if err != nil {
          log.Fatalf("无法更新索引设置: %v", err)
      }
      
      // 导入完成后恢复刷新间隔
      _, err = client.IndexPutSettings(IndexName).BodyJson(map[string]interface{}{
          "index": map[string]interface{}{
              "refresh_interval": "1s",
          },
      }).Do(context.Background())
      if err != nil {
          log.Fatalf("无法恢复索引设置: %v", err)
      }
      
    • 增加主分片数量: 根据数据量和查询需求,调整索引的主分片数量,以优化性能。
  5. 分批导入:

    • 如果数据量过大,可以分批次导入,每次导入一部分数据,观察其对集群性能的影响。

总结

通过上述更新和优化,能够更好地监控和管理数据生成与导入的过程,及时发现并解决可能的问题。如果导入过程仍然出现卡顿,请查看日志中的详细信息,以便进一步分析和定位问题。

10-24 14:39