问题描述
我正在努力加快某些过程的执行速度,这些过程将大量记录(大多数是数百万个)发布到Elasticsearch.在我的C#代码中,我已经使用Dataflow实现了一个多线程解决方案,如下所示:
I am working on speeding up the execution of some processes that publish a bulk set of records (Mostly in the millions) to Elasticsearch. In my C# code I have already implemented a multi-threaded solution using Dataflow as scaffolded below:
var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));
fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });
fetchRecords.Post("Start");
然后我要实现的发送批量请求调用:
And then the send bulk request call I want to implement:
public IBulkResponse sendBulkRequest(List<?> records)
{
lock(SomeStaticObject)
{
// Execute several new threads to send records in bulk
}
}
我对 you 的问题是关于在锁中执行附加线程的实用性的问题,该锁是数据流管道的一部分.
My question for you is on the practicality for executing additional threads within a lock that exists as part of a Dataflow pipeline.
可以吗?我可以在性能,执行,缓存/内存丢失等方面看到任何潜在的问题吗?
Is this ok? Could I see any potential hiccups in performance, execution, cache/memory misses, etc?
任何见识都会很高兴被接受.
Any insight would be gladly accepted.
推荐答案
您可能想在此处使用BulkAll
,它实现了可观察的模式来向Elasticsearch发出并发批量请求.这是一个例子
You may want to use BulkAll
here, which implements the observable pattern to make concurrent bulk requests to Elasticsearch. Here's an example
void Main()
{
var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var connectionSettings = new ConnectionSettings(pool);
var client = new ElasticClient(connectionSettings);
var indexName = "bulk-index";
if (client.IndexExists(indexName).Exists)
client.DeleteIndex(indexName);
client.CreateIndex(indexName, c => c
.Settings(s => s
.NumberOfShards(3)
.NumberOfReplicas(0)
)
.Mappings(m => m
.Map<DeviceStatus>(p => p.AutoMap())
)
);
var size = 500;
// set up the observable
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
.Index(indexName)
.MaxDegreeOfParallelism(4)
.RefreshOnCompleted()
.Size(size)
);
var countdownEvent = new CountdownEvent(1);
Exception exception = null;
// set up an observer. Delegates passed are:
// 1. onNext
// 2. onError
// 3. onCompleted
var bulkAllObserver = new BulkAllObserver(
response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),
ex =>
{
// capture exception for throwing outside Observer.
// You may decide to do something different here
exception = ex;
countdownEvent.Signal();
},
() =>
{
Console.WriteLine("Finished");
countdownEvent.Signal();
});
// subscribe to the observable
bulkAllObservable.Subscribe(bulkAllObserver);
// wait indefinitely for it to finish. May want to put a
// max timeout on this
countdownEvent.Wait();
if (exception != null)
{
throw exception;
}
}
// lazily enumerated collection
private static IEnumerable<DeviceStatus> GetDeviceStatus()
{
for (var i = 0; i < DocumentCount; i++)
yield return new DeviceStatus(i);
}
private const int DocumentCount = 20000;
public class DeviceStatus
{
public DeviceStatus(int id) => Id = id;
public int Id {get;set;}
}
如果您不需要在观察者中做任何特别的事情,可以在可观察对象上使用.Wait()
方法
If you don't need to do anything special in the observer, you can use the .Wait()
method on the observable
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
.Index(indexName)
.MaxDegreeOfParallelism(4)
.RefreshOnCompleted()
.Size(size)
)
.Wait(
TimeSpan.FromHours(1),
response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries")
);
对于BulkAll
,ScrollAll
和Reindex
有可观察到的方法(尽管有ReindexOnServer
可在Elasticsearch中重新索引并映射到 Reindex API -Reindex
方法早于此)
There are observable methods for BulkAll
, ScrollAll
and Reindex
(although there is ReindexOnServer
which reindexes within Elasticsearch and maps to the Reindex API - the Reindex
method predates this)
这篇关于线程锁内的多线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!