问题描述
似乎 XPUB/XSUB 套接字类型有一个难以解决的严重缺陷:
这是我对该中心节点的实现:
#include int main(){zmq::context_t 上下文(1);//传入的出版物来到这里zmq::socket_t sub(context, ZMQ_XSUB);sub.bind("ipc://subscriber.ipc");//发出的出版物从这里出去.zmq::socket_t pub(context, ZMQ_XPUB);pub.bind("ipc://publisher.ipc");zmq::proxy(sub, pub, nullptr);返回0;}
问题当然是慢木工综合症.如果我将新发布者连接到 XSUB 并发布一些消息,它们就会消失在空白中:
#include "zhelpers.hpp"int主(){//准备我们的上下文和发布者zmq::context_t 上下文(1);zmq::socket_t 发布者(上下文,ZMQ_PUB);Publisher.connect("ipc://subscriber.ipc");s_sendmore(出版商,B");s_send(出版商,消失在虚空中!!");返回0;}
但是,如果我在连接到 XSUB 后 sleep(1)
,它会神奇地工作:
#include "zhelpers.hpp"int主(){//准备我们的上下文和发布者zmq::context_t 上下文(1);zmq::socket_t 发布者(上下文,ZMQ_PUB);Publisher.connect("ipc://subscriber.ipc");睡眠(1);s_sendmore(出版商,B");s_send(出版商,神奇的作品!!");返回0;}
指南声称有一个简单的解决方案可以解决这种慢速加入者"综合症,但从来没有提供有效的同步 XSUB/XPUB 实现.经过大量搜索,看起来大多数人只是睡觉
ing,这真的很糟糕.
为什么一直没有解决这个问题?是否有任何已知的解决方法?我所有的谷歌查询只是让我回到指南......
我找到了一种解决方法
这是中心节点所需的代码:
#include int main(){zmq::context_t 上下文(1);//传入的出版物来到这里zmq::socket_t sub(context, ZMQ_PULL);sub.bind("ipc://subscriber.ipc");//发出的出版物从这里出去.zmq::socket_t pub(context, ZMQ_PUB);pub.bind("ipc://publisher.ipc");zmq::proxy(sub, pub, nullptr);返回0;}
然后对于发布商:
#include "zhelpers.hpp"int主(){//准备我们的上下文和发布者zmq::context_t 上下文(1);zmq::socket_t 发布者(上下文,ZMQ_PUSH);Publisher.connect("ipc://subscriber.ipc");s_sendmore(出版商,B");s_send(出版商,不睡觉!");返回0;}
这个解决方案似乎工作得相当好,我希望人们在看到它的任何缺点时提出意见.如果我遇到更好的答案,我会在这里发布.
It seems as though the XPUB/XSUB socket types have a serious flaw that is difficult to work around:
This is my implementation of that center node:
#include <zmq.hpp>
int main()
{
zmq::context_t context(1);
//Incoming publications come here
zmq::socket_t sub(context, ZMQ_XSUB);
sub.bind("ipc://subscriber.ipc");
//Outgoing publications go out through here.
zmq::socket_t pub(context, ZMQ_XPUB);
pub.bind("ipc://publisher.ipc");
zmq::proxy(sub, pub, nullptr);
return 0;
}
The problem is, of course, slow joiner syndrome. If I connect a new publisher to XSUB and publish some messages, they disappear into the void:
#include "zhelpers.hpp"
int main () {
// Prepare our context and publisher
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.connect("ipc://subscriber.ipc");
s_sendmore (publisher, "B");
s_send (publisher, "Disappears into the void!!");
return 0;
}
However, if I sleep(1)
after connecting to XSUB, it magically works:
#include "zhelpers.hpp"
int main () {
// Prepare our context and publisher
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.connect("ipc://subscriber.ipc");
sleep(1);
s_sendmore (publisher, "B");
s_send (publisher, "Magically works!!");
return 0;
}
The Guide claims there is a simple solution to this "slow joiners" syndrome, but then never delivers a working synchronized XSUB/XPUB implementation. After much searching it looks like most people are just sleep
ing, which is really bad.
Why hasn't this ever been fixed? Are there any known workarounds? All my google queries just point me back to the guide...
I found one workaround here, and that is to use PUSH/PULL on the publisher side, and PUB/SUB on the subscriber side. The new topology looks like this:
This is the code you need for the center node:
#include <zmq.hpp>
int main()
{
zmq::context_t context(1);
//Incoming publications come here
zmq::socket_t sub(context, ZMQ_PULL);
sub.bind("ipc://subscriber.ipc");
//Outgoing publications go out through here.
zmq::socket_t pub(context, ZMQ_PUB);
pub.bind("ipc://publisher.ipc");
zmq::proxy(sub, pub, nullptr);
return 0;
}
And then for publishers:
#include "zhelpers.hpp"
int main () {
// Prepare our context and publisher
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUSH);
publisher.connect("ipc://subscriber.ipc");
s_sendmore (publisher, "B");
s_send (publisher, "No sleep!");
return 0;
}
This solution seems to work fairly well, and I hope people chime in if they see any downsides to it. If I come across a better answer, I'll post here.
这篇关于ZeroMQ XPUB/XSUB 严重缺陷?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!