我注意到如果要将数据批量发送到elasticsearch中,可以使用BulkIndexer。如Elastigo文档中所述



elastigo中的代码可批量插入

var c_es = elastigo.NewConn()
var indexer = c_es.NewBulkIndexer(50)

func insertInBulkElastic(){
    //Create a custom error function when inserting data into elasticsearch
   //in bulk
    indexer.Sender = func(buf *bytes.Buffer) error {
    // @buf is the buffer of docs about to be written
    respJson, err := c_es.DoCommand("POST", "/_bulk", nil, buf)
    if err != nil {
        // handle it better than this

        fmt.Println("Error", string(respJson)) //

        fmt.Println("Error", err)
    }

    if err == nil {
        fmt.Println("The data was inserted successfullly to elastic search")
    }
    return err
  }



}

有人知道如何使用Olivere发送Golang批量请求吗?

谢谢

最佳答案

这是在Go中使用olivere的工作示例。您可以阅读有关BulkProcessor的更多信息here

希望这个帮助:)

package main

import (
    "context"
    "log"
    "time"

    elastic "gopkg.in/olivere/elastic.v5"
)

func main() {
    options := []elastic.ClientOptionFunc{
        elastic.SetHealthcheck(true),
        elastic.SetHealthcheckTimeout(20 * time.Second),
        elastic.SetSniff(false),
        elastic.SetHealthcheckInterval(30 * time.Second),
        elastic.SetURL("http://127.0.0.1:9200"),
        elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewConstantBackoff(5 * time.Second))),
    }
    client, err := elastic.NewClient(options...)
    if err != nil {
        panic(err)
    }
    // ensure index exist
    exists, err := client.IndexExists("my_index").Do(context.Background())
    if err != nil {
        panic(err)
    }
    if !exists {
        if _, err := client.CreateIndex("my_index").Do(context.Background()); err != nil {
            panic(err)
        }
    }
    client.PutMapping().Index("my_index").BodyJson(map[string]interface{}{
        "properties": map[string]string{
            "name": "keyword",
        },
    }).Do(context.Background())

    // create new bulk processor from client
    bulkProcessor, err := elastic.NewBulkProcessorService(client).
        Workers(5).
        BulkActions(1000).
        FlushInterval(1 * time.Second).
        After(after).
        Do(context.Background())

    // now the bulk processor can be reused for entire the app
    myDoc := struct {
        Name string
    }{
        Name: "jack",
    }
    req := elastic.NewBulkIndexRequest()
    req.Index("my_index").Type("type").Id("my_doc_id").Doc(myDoc)

    // Use Add method to add request into the processor
    bulkProcessor.Add(req)

    // wait for sometime...
    time.Sleep(5 * time.Second)
}

func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
    if err != nil {
        log.Printf("bulk commit failed, err: %v\n", err)
    }
    // do what ever you want in case bulk commit success
    log.Printf("commit successfully, len(requests)=%d\n", len(requests))
}

关于elasticsearch - Olivere软件包中的BulkIndexer用于Golang替换Elastigo,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53424391/

10-11 06:25