我有以下情况。
我有一个接收消息的队列。收到消息后,将创建一个新线程来处理它。
该线程接收消息并将其添加到集合中。然后,它检查集合中是否包含100个项目,是否将其发送到其他地方并清除集合。
我无法使用常规列表,因为我修改了集合,枚举无法继续发生错误。所以我需要使用线程安全集合。
不过,我担心的是,一个线程将其写入第100个项目,同时将其发送到其他地方,而另一个线程将其添加到集合中。使它成为101个项目,触发第100个线程的线程将其清除,而我丢失了一个项目。
我不能使用并发包,因为它并不清楚,我不能迭代包并一一删除(1),因为消息可能会传入并添加得比删除快,并且永远不会结束。
ConcurrentStack有一个明确的例子,但是在这种情况下可以吗?
一些代码演示了我的意思,HandleMeasurementMessage发生在每个消息的新线程上。
private static readonly ConcurrentStack<EventHubDatum> EventHubDataBatch = new ConcurrentStack<EventHubDatum>();
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
/* Do a bunch of stuff to msg */
EventHubDataBatch.Push(eventHubDatum);
if(EventHubDataBatch.Count == 100)
{
/* Send them off*/
EventHubDatabatch.Clear();
}
}
奇怪的是,仅当我未通过VS2015中的调试器运行枚举时,才会发生枚举被修改的问题。该程序运行一个小时左右就可以了。如果关闭调试器,则会出现这些枚举错误,这就是为什么我尝试切换到线程安全集合的原因。我只是不确定哪一个合适。
调用HandleMeasurementMessage的代码
_busSingle.Consume<MessageEnvelope>(_queueMeasurement, (msg, MessageReceivedInfo) => Task.Factory.StartNew(() =>
{
try
{
HandleMeasurementMessage(msg);
}
catch (Exception ex)
{
/* Logging stuff*/
}
}));
最佳答案
我只会使用这样的简单锁:
private static readonly List<EventHubDatum> EventHubDataBatch = new List<EventHubDatum>();
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
/* Do a bunch of stuff to msg */
EventHubDatum[] toSend = null;
lock (EventHubDataBatch) {
EventHubDataBatch.Add(eventHubDatum);
if (EventHubDataBatch.Count == 100) {
// copy to local
toSend = EventHubDataBatch.ToArray();
EventHubDataBatch.Clear();
}
}
if (toSend != null) {
/* Send them off*/
}
}
锁定此处非常简短,因此不会以任何明显的方式影响您的情况。请注意,如果有100个项目-我们会将它们复制到本地数组并清除源列表,以在“发送它们”操作期间不持有锁,这可能会花费很长时间。