demo地址:BulkAll

批量导入

实现目标:想要使用ElasticSearch的 .Net Api客户端NEST批量导入数据,并发异步高效的批量导入

NEST提供了BulkAll

不废话,上代码

            const int size = 1000;
var tokenSource = new CancellationTokenSource(); var observableBulk = elasticClient.BulkAll(list, f => f
.MaxDegreeOfParallelism(8)
.BackOffTime(TimeSpan.FromSeconds(10))
.BackOffRetries(2)
.Size(size)
.RefreshOnCompleted()
.Index(indexName)
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
, tokenSource.Token); var countdownEvent = new CountdownEvent(1); Exception exception = null; var bulkAllObserver = new BulkAllObserver(); observableBulk.Subscribe(bulkAllObserver); countdownEvent.Wait(tokenSource.Token);

如果想要对处理导入过程进行监控可以这么替换BulkAllObserver

               var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
WriteLine($"Indexed {response.Page * size} with {response.Retries} retries");
},
onError: ex =>
{
WriteLine("BulkAll Error : {0}", ex);
exception = ex;
countdownEvent.Signal();
},
() =>
{
WriteLine("BulkAll Finished");
countdownEvent.Signal();
});

还可以使用C#的local function特性,如下所示

            void OnCompleted()
{
WriteLine("BulkAll Finished");
countdownEvent.Signal();
} var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
WriteLine($"Indexed {response.Page * size} with {response.Retries} retries");
},
onError: ex =>
{
WriteLine("BulkAll Error : {0}", ex);
exception = ex;
countdownEvent.Signal();
},
OnCompleted);

完成demo,请点击 BulkAll 查看

04-30 05:11