XPUB/XSUB套接字类型似乎有一个严重的缺陷,很难解决:
这是我对该中心节点的实现:
#include <zmq.hpp>
int main()
{
zmq::context_t context(1);
//Incoming publications come here
zmq::socket_t sub(context, ZMQ_XSUB);
sub.bind("ipc://subscriber.ipc");
//Outgoing publications go out through here.
zmq::socket_t pub(context, ZMQ_XPUB);
pub.bind("ipc://publisher.ipc");
zmq::proxy(sub, pub, nullptr);
return 0;
}
问题当然是慢木匠综合症。如果我将新的发布者连接到XSUB并发布一些消息,它们将消失在空白处:
#include "zhelpers.hpp"
int main () {
// Prepare our context and publisher
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.connect("ipc://subscriber.ipc");
s_sendmore (publisher, "B");
s_send (publisher, "Disappears into the void!!");
return 0;
}
但是,如果我在连接到XSUB之后使用
sleep(1)
,则可以神奇地工作:#include "zhelpers.hpp"
int main () {
// Prepare our context and publisher
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.connect("ipc://subscriber.ipc");
sleep(1);
s_sendmore (publisher, "B");
s_send (publisher, "Magically works!!");
return 0;
}
该指南声称,有一个针对这种“慢连接器”综合症的简单解决方案,但是从没有提供可工作的同步XSUB/XPUB实现。经过大量搜索后,看来大多数人都只是
sleep
ing,这确实很糟糕。为什么从来没有解决过这个问题?有任何已知的解决方法吗?我所有的Google查询都使我回到指南中。
最佳答案
我找到了一种解决方法here,那就是在发布者端使用PUSH/PULL,在订阅者端使用PUB/SUB。新的拓扑如下所示:
这是中心节点所需的代码:
#include <zmq.hpp>
int main()
{
zmq::context_t context(1);
//Incoming publications come here
zmq::socket_t sub(context, ZMQ_PULL);
sub.bind("ipc://subscriber.ipc");
//Outgoing publications go out through here.
zmq::socket_t pub(context, ZMQ_PUB);
pub.bind("ipc://publisher.ipc");
zmq::proxy(sub, pub, nullptr);
return 0;
}
然后对于发布者:
#include "zhelpers.hpp"
int main () {
// Prepare our context and publisher
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUSH);
publisher.connect("ipc://subscriber.ipc");
s_sendmore (publisher, "B");
s_send (publisher, "No sleep!");
return 0;
}
该解决方案似乎效果很好,并且我希望人们看到它的任何缺点后都可以听到。如果我有更好的答案,我会在这里发布。