利用go语言的并发能力并发生成测试数据写入es
ES生成测试数据并写入阿里云ES
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log"
"math"
"math/rand"
"net/http" // 重新添加 net/http 包
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/brianvoe/gofakeit/v6"
"github.com/olivere/elastic/v7"
)
// 定义数据结构
type Address struct {
Street string `json:"street"`
City string `json:"city"`
Zipcode string `json:"zipcode"`
}
type Record struct {
UserID string `json:"user_id"`
Name string `json:"name"`
Email string `json:"email"`
Age int `json:"age"`
SignupDate string `json:"signup_date"`
Address Address `json:"address"`
}
// 配置常量
const (
TargetSizeGB = 10 // 目标数据大小(GB)
OutputDir = "output_data" // 输出文件目录
Workers = 8 // 数据生成器数量
BufferChannelSize = 10000 // 通道缓冲区大小
MaxFileSizeBytes = 500 * 1024 * 1024 // 每个文件最大500MB
AvgRecordSizeBytes = 500 // 估计每条记录500字节
ProgressPrintMB = 100 // 每100MB打印一次进度
ProgressGenInterval = 100000 // 每10万条记录生成器打印一次进度
BulkSize = 500 // 每个 Bulk 请求的记录数,减半以提高成功率
WorkerCount = 8 // 并发的工作线程数
MaxRetries = 3 // 最大重试次数
InitialBackoff = 2 * time.Second // 初始退避时间
ElasticsearchURL = "http://es-cn-jtexxxxx54r31.public.elasticsearch.aliyuncs.com:9200" // 使用 HTTP 和 9200 端口
ElasticsearchUsername = "elastic" // 替换为您的用户名
ElasticsearchPassword = "xxxxxx56789" // 替换为您的密码
IndexName = "es_index" // 替换为您的索引名称
)
func init() {
gofakeit.Seed(0)
rand.Seed(time.Now().UnixNano())
}
// 生成一条记录
func generateRecord() Record {
return Record{
UserID: gofakeit.UUID(),
Name: gofakeit.Name(),
Email: gofakeit.Email(),
Age: rand.Intn(63) + 18, // 18-80岁
SignupDate: gofakeit.Date().Format("2006-01-02"),
Address: Address{
Street: gofakeit.Street(),
City: gofakeit.City(),
Zipcode: gofakeit.Zip(),
},
}
}
// 创建 Elasticsearch 客户端(使用 HTTP 和 9200 端口)
func createESClient() (*elastic.Client, error) {
// 创建默认的 HTTP 客户端,增加超时时间至60秒
httpClient := &http.Client{
Timeout: 60 * time.Second, // 增加超时时间至60秒
}
// 创建 Elasticsearch 客户端
client, err := elastic.NewClient(
elastic.SetURL(ElasticsearchURL),
elastic.SetBasicAuth(ElasticsearchUsername, ElasticsearchPassword),
elastic.SetSniff(false), // 在云服务中通常禁用 sniff
elastic.SetHttpClient(httpClient), // 使用自定义的 http.Client
)
if err != nil {
return nil, fmt.Errorf("创建 Elasticsearch 客户端失败: %v", err)
}
// 检查连接
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, _, err = client.Ping(ElasticsearchURL).Do(ctx)
if err != nil {
return nil, fmt.Errorf("无法连接 Elasticsearch: %v", err)
}
fmt.Println("Elasticsearch 客户端连接成功")
return client, nil
}
// 获取所有 JSON 文件路径
func getJSONFiles(dir string) ([]string, error) {
var files []string
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && filepath.Ext(path) == ".json" {
files = append(files, path)
}
return nil
})
return files, err
}
// 读取单个文件中的所有记录
func readRecords(filePath string, recordsChan chan<- string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
count := 0
for scanner.Scan() {
recordsChan <- scanner.Text()
count++
if count%100000 == 0 {
log.Printf("读取文件 %s 的记录数: %d", filePath, count)
}
}
if err := scanner.Err(); err != nil {
return err
}
log.Printf("完成读取文件 %s,共读取 %d 条记录", filePath, count)
return nil
}
// 执行 Bulk 请求并处理响应
func executeBulk(client *elastic.Client, bulkRequest *elastic.BulkService) error {
// 使用带有超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) // 设置上下文超时时间
defer cancel()
var err error
backoff := InitialBackoff
for attempt := 1; attempt <= MaxRetries; attempt++ {
bulkResponse, err := bulkRequest.Do(ctx)
if err != nil {
log.Printf("Bulk 请求第 %d 次失败: %v", attempt, err)
time.Sleep(backoff)
backoff *= 2
continue
}
if bulkResponse.Errors {
hasError := false
for _, item := range bulkResponse.Items {
for action, result := range item {
if result.Error != nil {
hasError = true
log.Printf("Bulk 操作失败: %s %s: %v", action, result.Id, result.Error)
}
}
}
if hasError {
err = fmt.Errorf("Bulk 请求包含错误")
log.Printf("Bulk 请求第 %d 次包含错误", attempt)
time.Sleep(backoff)
backoff *= 2
continue
}
}
// 成功执行
return nil
}
return fmt.Errorf("Bulk 请求失败,已达到最大重试次数: %v", err)
}
// 写入器负责将记录写入文件(数据生成部分)
func writer(wg *sync.WaitGroup, recordsChan <-chan Record, filePathChan <-chan string) {
defer wg.Done()
var file *os.File
var writerBuf *bufio.Writer
var currentSize int64 = 0
var currentFilePath string
var nextProgress int64 = ProgressPrintMB * 1024 * 1024
for record := range recordsChan {
// 检查是否需要切换文件
if currentFilePath == "" || currentSize >= MaxFileSizeBytes {
// 获取新的文件路径
fp, ok := <-filePathChan
if !ok {
log.Println("文件路径通道已关闭,无法获取新的文件路径")
break
}
// 关闭旧文件
if writerBuf != nil {
writerBuf.Flush()
}
if file != nil {
file.Close()
}
// 打开新文件
var err error
file, err = os.Create(fp)
if err != nil {
log.Printf("创建文件 %s 失败: %v\n", fp, err)
continue
}
writerBuf = bufio.NewWriter(file)
currentFilePath = fp
currentSize = 0
nextProgress = ProgressPrintMB * 1024 * 1024
log.Printf("开始写入文件: %s", fp)
}
// 序列化记录
recordBytes, err := json.Marshal(record)
if err != nil {
log.Printf("序列化记录失败: %v\n", err)
continue
}
recordStr := string(recordBytes) + "\n"
// 写入文件
_, err = writerBuf.WriteString(recordStr)
if err != nil {
log.Printf("写入文件失败: %v\n", err)
continue
}
currentSize += int64(len(recordStr))
// 打印进度
if currentSize >= nextProgress {
log.Printf("已写入 %s 大小: %.2f MB", currentFilePath, float64(currentSize)/(1024*1024))
nextProgress += ProgressPrintMB * 1024 * 1024
}
}
// 写入剩余的数据
if writerBuf != nil {
writerBuf.Flush()
}
if file != nil {
file.Close()
}
log.Println("写入器完成所有文件的写入")
}
// 数据生成函数
func generateData() {
start := time.Now()
log.Println("开始生成数据...")
// 创建输出目录
if _, err := os.Stat(OutputDir); os.IsNotExist(err) {
err := os.Mkdir(OutputDir, os.ModePerm)
if err != nil {
log.Fatalf("创建输出目录失败: %v", err)
}
log.Printf("创建输出目录: %s", OutputDir)
} else {
log.Printf("输出目录已存在: %s", OutputDir)
}
// 计算总记录数
totalBytes := int64(TargetSizeGB) * 1024 * 1024 * 1024
totalRecords := totalBytes / AvgRecordSizeBytes
recordsPerWorker := totalRecords / int64(Workers)
remainder := totalRecords % int64(Workers)
// 计算每个文件能容纳的记录数
recordsPerFile := MaxFileSizeBytes / AvgRecordSizeBytes
totalFiles := int(math.Ceil(float64(totalRecords) / float64(recordsPerFile)))
log.Printf("总记录数: %d", totalRecords)
log.Printf("每个文件记录数: %d", recordsPerFile)
log.Printf("总文件数: %d", totalFiles)
// 创建通道
recordsChan := make(chan Record, BufferChannelSize)
filePathChan := make(chan string, totalFiles) // 足够的缓冲区存放所有文件路径
var wg sync.WaitGroup
// 启动写入器
wg.Add(1)
go writer(&wg, recordsChan, filePathChan)
// 生成所有文件路径
go func() {
for i := 1; i <= totalFiles; i++ {
filePath := filepath.Join(OutputDir, "test_data_part_"+strconv.Itoa(i)+".json")
filePathChan <- filePath
}
close(filePathChan) // 所有文件路径发送完毕后关闭通道
log.Println("所有文件路径已生成并发送到通道")
}()
// 启动生成器
var genWg sync.WaitGroup
for i := 0; i < Workers; i++ {
genWg.Add(1)
go func(workerID int, numRecords int64) {
defer genWg.Done()
for j := int64(0); j < numRecords; j++ {
record := generateRecord()
recordsChan <- record
if (j+1)%ProgressGenInterval == 0 {
log.Printf("生成器 %d 已生成 %d 条记录", workerID, j+1)
}
}
log.Printf("生成器 %d 完成生成 %d 条记录", workerID, numRecords)
}(i+1, recordsPerWorker)
}
// 处理余数
if remainder > 0 {
genWg.Add(1)
go func() {
defer genWg.Done()
for j := int64(0); j < remainder; j++ {
record := generateRecord()
recordsChan <- record
}
log.Printf("生成器 %d 完成生成余数 %d 条记录", Workers+1, remainder)
}()
}
// 等待生成器完成
genWg.Wait()
close(recordsChan)
log.Println("所有生成器已完成,关闭 recordsChan")
// 等待写入器完成
wg.Wait()
elapsed := time.Since(start)
log.Printf("数据生成完成,总耗时: %s", elapsed)
}
// 导入 Elasticsearch 的函数
func importToES() {
start := time.Now()
log.Println("开始导入数据到 Elasticsearch...")
client, err := createESClient()
if err != nil {
log.Fatalf("无法连接 Elasticsearch: %v", err)
}
// 获取所有 JSON 文件
files, err := getJSONFiles(OutputDir)
if err != nil {
log.Fatalf("无法读取 JSON 文件: %v", err)
}
if len(files) == 0 {
log.Fatalf("没有找到任何 JSON 文件在目录: %s", OutputDir)
}
log.Printf("找到 %d 个 JSON 文件用于导入", len(files))
recordsChan := make(chan string, 10000)
var wg sync.WaitGroup
// 启动读取文件的 goroutine
go func() {
for _, file := range files {
log.Printf("开始读取文件: %s", file)
if err := readRecords(file, recordsChan); err != nil {
log.Printf("读取文件 %s 失败: %v", file, err)
}
log.Printf("完成读取文件: %s", file)
}
close(recordsChan)
log.Println("所有文件读取完成,关闭 recordsChan")
}()
// 启动 Worker
for i := 0; i < WorkerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
var bulkRequest *elastic.BulkService
bulkRequest = client.Bulk()
count := 0
totalImported := 0
for record := range recordsChan {
var doc map[string]interface{}
if err := json.Unmarshal([]byte(record), &doc); err != nil {
log.Printf("Worker %d 解析 JSON 失败: %v", workerID, err)
continue
}
// 确保 user_id 字段存在且为 string 类型
userID, ok := doc["user_id"].(string)
if !ok {
log.Printf("Worker %d 无法获取有效的 user_id", workerID)
continue
}
req := elastic.NewBulkIndexRequest().Index(IndexName).Id(userID).Doc(doc)
bulkRequest = bulkRequest.Add(req)
count++
if count >= BulkSize {
// 执行 Bulk 请求
if err := executeBulk(client, bulkRequest); err != nil {
log.Printf("Worker %d 执行 Bulk 请求失败: %v", workerID, err)
} else {
totalImported += count
log.Printf("Worker %d 成功导入 %d 条记录,累计导入 %d 条", workerID, count, totalImported)
}
// 重置 Bulk 请求
bulkRequest = client.Bulk()
count = 0
}
if totalImported%100000 == 0 && totalImported > 0 {
log.Printf("Worker %d 已累计导入 %d 条记录", workerID, totalImported)
}
}
// 执行剩余的 Bulk 请求
if count > 0 {
if err := executeBulk(client, bulkRequest); err != nil {
log.Printf("Worker %d 执行 Bulk 请求失败: %v", workerID, err)
} else {
totalImported += count
log.Printf("Worker %d 成功导入剩余的 %d 条记录,累计导入 %d 条", workerID, count, totalImported)
}
}
log.Printf("Worker %d 完成数据导入,总导入 %d 条记录", workerID, totalImported)
}(i + 1)
}
// 等待所有 Worker 完成
wg.Wait()
elapsed := time.Since(start)
log.Printf("数据导入完成,总耗时: %s", elapsed)
log.Println("所有数据已成功导入 Elasticsearch")
}
func main() {
// 配置日志输出格式
log.SetFlags(log.LstdFlags | log.Lshortfile)
// 根据功能分离,先生成数据,再导入ES
generateData()
importToES()
}
主要更新和优化
-
增加详细的日志输出:
- 在数据读取、写入和导入的各个关键步骤添加了日志输出,帮助跟踪进度和排查问题。
- 使用
log.Printf
记录关键事件,如开始和完成读取文件、生成器生成记录、写入文件以及每个 Worker 导入的进度。
-
调整 Bulk 请求的超时时间:
- 将
executeBulk
函数中的上下文超时时间从 60 秒增加到 120 秒,以防止由于大量数据导致的请求超时。
- 将
-
校验和报错加强:
- 在
readRecords
函数中记录每个文件读取的进度和完成情况。 - 在
importToES
的 Worker 中记录每次 Bulk 请求的成功和失败情况,以及累计导入的记录数。
- 在
-
优化并发和通道处理:
- 确保
filePathChan
在所有文件路径发送完毕后关闭,以防止写入器阻塞。 - 在生成数据的过程中,确保所有生成器完成后才关闭
recordsChan
。
- 确保
-
日志格式配置:
- 在
main
函数中配置日志输出格式,以包括时间戳和文件行号,便于调试。
- 在
运行和监控
运行此程序时,您将看到类似以下的日志输出:
2024/04/27 10:00:00 main.go:123: 开始生成数据...
2024/04/27 10:00:00 main.go:133: 创建输出目录: output_data
2024/04/27 10:00:00 main.go:142: 总记录数: 21474836480
2024/04/27 10:00:00 main.go:143: 每个文件记录数: 1048576
2024/04/27 10:00:00 main.go:144: 总文件数: 20480
2024/04/27 10:00:00 main.go:156: 开始写入文件: output_data/test_data_part_1.json
2024/04/27 10:00:05 main.go:172: 生成器 1 已生成 100000 条记录
...
2024/04/27 10:02:00 main.go:198: 数据生成完成,总耗时: 2m0s
2024/04/27 10:02:00 main.go:201: 开始导入数据到 Elasticsearch...
2024/04/27 10:02:00 main.go:61: Elasticsearch 客户端连接成功
2024/04/27 10:02:00 main.go:217: 找到 20480 个 JSON 文件用于导入
2024/04/27 10:02:00 main.go:227: 开始读取文件: output_data/test_data_part_1.json
2024/04/27 10:02:30 main.go:237: 完成读取文件: output_data/test_data_part_1.json,共读取 1000000 条记录
...
2024/04/27 10:05:00 main.go:259: Worker 1 成功导入 500 条记录,累计导入 500 条
...
2024/04/27 10:10:00 main.go:291: Worker 1 完成数据导入,总导入 500000 条记录
...
2024/04/27 10:20:00 main.go:303: 数据导入完成,总耗时: 18m20s
2024/04/27 10:20:00 main.go:304: 所有数据已成功导入 Elasticsearch
通过这些日志,您可以:
- 跟踪数据生成过程: 了解每个生成器的进度和完成情况。
- 监控文件写入过程: 确认每个文件的写入进度和完成状态。
- 监测数据导入过程: 追踪每个 Worker 导入的记录数,以及每次 Bulk 请求的处理情况。
进一步优化建议
-
调整 Bulk 请求大小和 Worker 数量:
- 根据 Elasticsearch 集群的性能,您可以尝试增大或减小
BulkSize
,以及调整WorkerCount
以优化导入速度。 - 例如,将
BulkSize
增加到 1000 或 2000,观察其对导入性能的影响。
- 根据 Elasticsearch 集群的性能,您可以尝试增大或减小
-
监控 Elasticsearch 性能:
- 在导入过程中,监控 Elasticsearch 集群的 CPU、内存、磁盘 I/O 和网络状况,确保集群能够处理高负载。
- 使用 Elasticsearch 提供的监控工具,比如 Kibana 的监控插件,来实时查看集群的健康状态和性能指标。
-
处理导入过程中可能出现的错误:
- 如果遇到频繁的 Bulk 请求错误,可以根据错误日志分析原因,比如索引模板不匹配、映射冲突等,并进行相应调整。
-
优化索引设置:
- 临时关闭刷新间隔: 在大规模导入时,可以暂时关闭自动刷新,以提高导入性能。导入完成后,再手动触发刷新。
// 在导入前临时调整索引设置 _, err := client.IndexPutSettings(IndexName).BodyJson(map[string]interface{}{ "index": map[string]interface{}{ "refresh_interval": "-1", }, }).Do(context.Background()) if err != nil { log.Fatalf("无法更新索引设置: %v", err) } // 导入完成后恢复刷新间隔 _, err = client.IndexPutSettings(IndexName).BodyJson(map[string]interface{}{ "index": map[string]interface{}{ "refresh_interval": "1s", }, }).Do(context.Background()) if err != nil { log.Fatalf("无法恢复索引设置: %v", err) }
- 增加主分片数量: 根据数据量和查询需求,调整索引的主分片数量,以优化性能。
- 临时关闭刷新间隔: 在大规模导入时,可以暂时关闭自动刷新,以提高导入性能。导入完成后,再手动触发刷新。
-
分批导入:
- 如果数据量过大,可以分批次导入,每次导入一部分数据,观察其对集群性能的影响。
总结
通过上述更新和优化,能够更好地监控和管理数据生成与导入的过程,及时发现并解决可能的问题。如果导入过程仍然出现卡顿,请查看日志中的详细信息,以便进一步分析和定位问题。