我有以下情况。

我有一个接收消息的队列。收到消息后,将创建一个新线程来处理它。

该线程接收消息并将其添加到集合中。然后,它检查集合中是否包含100个项目,是否将其发送到其他地方并清除集合。

我无法使用常规列表,因为我修改了集合,枚举无法继续发生错误。所以我需要使用线程安全集合。

不过,我担心的是,一个线程将其写入第100个项目,同时将其发送到其他地方,而另一个线程将其添加到集合中。使它成为101个项目,触发第100个线程的线程将其清除,而我丢失了一个项目。

我不能使用并发包,因为它并不清楚,我不能迭代包并一一删除(1),因为消息可能会传入并添加得比删除快,并且永远不会结束。

ConcurrentStack有一个明确的例子,但是在这种情况下可以吗?

一些代码演示了我的意思,HandleMeasurementMessage发生在每个消息的新线程上。

private static readonly ConcurrentStack<EventHubDatum> EventHubDataBatch = new ConcurrentStack<EventHubDatum>();

private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

   EventHubDataBatch.Push(eventHubDatum);

   if(EventHubDataBatch.Count == 100)
   {
      /* Send them off*/
      EventHubDatabatch.Clear();
   }
}


奇怪的是,仅当我未通过VS2015中的调试器运行枚举时,才会发生枚举被修改的问题。该程序运行一个小时左右就可以了。如果关闭调试器,则会出现这些枚举错误,这就是为什么我尝试切换到线程安全集合的原因。我只是不确定哪一个合适。

调用HandleMeasurementMessage的代码

_busSingle.Consume<MessageEnvelope>(_queueMeasurement, (msg, MessageReceivedInfo) => Task.Factory.StartNew(() =>
            {
                try
                {
                    HandleMeasurementMessage(msg);
                }
                catch (Exception ex)
                {
                    /* Logging stuff*/
                }
            }));

最佳答案

我只会使用这样的简单锁:

private static readonly List<EventHubDatum> EventHubDataBatch = new List<EventHubDatum>();
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

    EventHubDatum[] toSend = null;
    lock (EventHubDataBatch) {
        EventHubDataBatch.Add(eventHubDatum);

        if (EventHubDataBatch.Count == 100) {
            // copy to local
            toSend = EventHubDataBatch.ToArray();
            EventHubDataBatch.Clear();
        }
    }

    if (toSend != null) {
        /* Send them off*/
    }
}


锁定此处非常简短,因此不会以任何明显的方式影响您的情况。请注意,如果有100个项目-我们会将它们复制到本地数组并清除源列表,以在“发送它们”操作期间不持有锁,这可能会花费很长时间。

07-24 09:44
查看更多