我正在将Julia与软件包ZMQ一起使用。

我已成功使用ZMQ模式Dealer / Router发送和接收消息。

这是我收到消息的方式:

dataRecv = bytestring(ZMQ.recv(sockRouter))


但这是阻碍。我需要的是非阻塞。

在带有ZeroMQ的c ++中,我们可以执行以下操作:

zmq_msg_recv(&message, socket, 0); // blocking
zmq_msg_recv(&message, socket, ZMQ_DONTWAIT); // non-blocking


在茱莉亚,我找到了一个关键词:ZMQ.ZMQ_DONTWAIT,但我不知道如何使用它。我已经试过像这样:

dataRecv = bytestring(ZMQ.recv(sockRouter, ZMQ.ZMQ_DONTWAIT))


但我得到一个错误:


'recv'没有与recv(:: Socket,:: Int64)匹配的方法


那么在Julia中使用非阻塞模式是不可能的吗?

问题1)
我提出了一个问题here

如果我是对的,那么这个问题的答案是关于多线程+阻塞的。

我知道这可能有效,但是我更喜欢使用单线程+非阻塞。

问题2)
@Chisholm感谢您给我source of ZMQ.jl
但是我做了这样的测试:

dataRecv = bytestring(ZMQ.recv(sockRouter))
println("after recv")


如果我执行上述代码,它将在recv处阻塞。
换句话说,在我发送消息之前,它不会打印“ recv之后”。

因此,我认为这完全是一种阻止模式。

最佳答案

查看ZMQ.jl处的代码,似乎阻塞是由wait之后的:zmq_msg_recv引起的,因此这是recv的另一个定义,称为pollrecv,可以在Main模块中定义(更改是不必要的):

function pollrecv(socket::ZMQ.Socket,zmsg::Message)
    rc = -1
    while true
        rc = ccall((:zmq_msg_recv, ZMQ.zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
                    &zmsg, socket.data, ZMQ.ZMQ_DONTWAIT)
        if rc == -1
            if !(ZMQ.zmq_errno() == Base.Libc.EAGAIN)
                throw(ZMQ.StateError(ZMQ.jl_zmq_error_str()))
            end
            return false
        else
            ZMQ.get_events(socket) != 0 && notify(socket)
            break
        end
    end
    return true
end


您可以通过以下方式使用此功能:

msg = Message()
while !pollrecv(s1,msg)
    sleep(3)
    println("ZZzzzz...")
end
out = convert(IOStream,msg)
println(takebuf_string(out))
close(msg)
ZMQ.send(s1,"response important for next receive")


当然,ZMQ.jl使用轮询实现阻塞,并且while应该用其他处理代替。

关于zeromq - zeromq + julia:如何设置标志,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33674604/

10-12 01:03