本文介绍了0MQ:如何在一个线程安全的方式使用ZeroMQ?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我阅读,我偶然发现了以下内容:



and later on:

I also understood that the ZeroMQ Context is threadsafe.

If a class registers for an event of a another class, in .Net, this event might be invoked from a different thread than the thread the listener was created on.

I think there are only two options to be able to dispatch something via ZeroMQ-Sockets from within an eventhandler:

  • Synchronize the eventhandler-invoking-thread to the thread the ZeroMQ-Socket was created in
  • Create a new ZeroMQ-Socket / get the exisiting ZeroMQ-Socket for the thread within the eventhandler by using the threadsafe ZeroMQ-Context

It seems that the 0MQ-Guide to discourage the first one and I don't think that creating a new ZeroMq-Socket for each thread is performant / the way to go.

My Question:
What is the correct pattern (the way it is meant to be) to publish messages via 0MQ from within an eventhandler?

Also, did the authors of the guide have the ZeroMQ-Binding for .Net in mind when they wrote:

Here is some samplecode to emphasize my problem/question:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally:
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}
解决方案

In .net framework v4 and up you can use concurrent collection to solve this problem. Namely Producer-Consumer pattern. Multiple threads (handlers) can send data to a thread-safe queue and just single thread consumes data from the queue and sends it using the socket.

Here is the idea:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;

这篇关于0MQ:如何在一个线程安全的方式使用ZeroMQ?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-23 00:53
查看更多