我注意到如果要将数据批量发送到elasticsearch中,可以使用BulkIndexer。如Elastigo文档中所述
elastigo中的代码可批量插入
var c_es = elastigo.NewConn()
var indexer = c_es.NewBulkIndexer(50)
func insertInBulkElastic(){
//Create a custom error function when inserting data into elasticsearch
//in bulk
indexer.Sender = func(buf *bytes.Buffer) error {
// @buf is the buffer of docs about to be written
respJson, err := c_es.DoCommand("POST", "/_bulk", nil, buf)
if err != nil {
// handle it better than this
fmt.Println("Error", string(respJson)) //
fmt.Println("Error", err)
}
if err == nil {
fmt.Println("The data was inserted successfullly to elastic search")
}
return err
}
}
有人知道如何使用Olivere发送Golang批量请求吗?
谢谢
最佳答案
这是在Go中使用olivere
的工作示例。您可以阅读有关BulkProcessor的更多信息here
希望这个帮助:)
package main
import (
"context"
"log"
"time"
elastic "gopkg.in/olivere/elastic.v5"
)
func main() {
options := []elastic.ClientOptionFunc{
elastic.SetHealthcheck(true),
elastic.SetHealthcheckTimeout(20 * time.Second),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(30 * time.Second),
elastic.SetURL("http://127.0.0.1:9200"),
elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewConstantBackoff(5 * time.Second))),
}
client, err := elastic.NewClient(options...)
if err != nil {
panic(err)
}
// ensure index exist
exists, err := client.IndexExists("my_index").Do(context.Background())
if err != nil {
panic(err)
}
if !exists {
if _, err := client.CreateIndex("my_index").Do(context.Background()); err != nil {
panic(err)
}
}
client.PutMapping().Index("my_index").BodyJson(map[string]interface{}{
"properties": map[string]string{
"name": "keyword",
},
}).Do(context.Background())
// create new bulk processor from client
bulkProcessor, err := elastic.NewBulkProcessorService(client).
Workers(5).
BulkActions(1000).
FlushInterval(1 * time.Second).
After(after).
Do(context.Background())
// now the bulk processor can be reused for entire the app
myDoc := struct {
Name string
}{
Name: "jack",
}
req := elastic.NewBulkIndexRequest()
req.Index("my_index").Type("type").Id("my_doc_id").Doc(myDoc)
// Use Add method to add request into the processor
bulkProcessor.Add(req)
// wait for sometime...
time.Sleep(5 * time.Second)
}
func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
log.Printf("bulk commit failed, err: %v\n", err)
}
// do what ever you want in case bulk commit success
log.Printf("commit successfully, len(requests)=%d\n", len(requests))
}
关于elasticsearch - Olivere软件包中的BulkIndexer用于Golang替换Elastigo,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53424391/