我有对How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?的跟进
这个问题要求使用XSUB和XPUB的C++代理。给出的答案本质上是下面引用的proxy main()函数。
我将此代理扩展到一个完整的工作示例,其中包括发布者和订阅者。问题是我的代码仅适用于经销商/路由器选项(如下面的注释所示)。使用下面的实际(未注释)XPUB / XSUB选项,订阅者不会收到消息。怎么了是否有消息接收调整?
代理不适用于XPUB / XSUB(注释中的有效经销商/路由器)
#include <zmq.hpp>
int main(int argc, char* argv[]) {
zmq::context_t ctx(1);
zmq::socket_t frontend(ctx, /*ZMQ_ROUTER*/ ZMQ_XSUB);
zmq::socket_t backend(ctx, /*ZMQ_DEALER*/ ZMQ_XPUB);
frontend.bind("tcp://*:5570");
backend.bind("tcp://*:5571");
zmq::proxy(frontend, backend, nullptr);
return 0;
}
订户不适用于ZMQ_SUB(注释中的有效经销商/路由器选项)
#include <iostream>
#include <zmq.hpp>
std::string GetStringFromMessage(const zmq::message_t& msg) {
char* tmp = new char[msg.size()+1];
memcpy(tmp,msg.data(),msg.size());
tmp[msg.size()] = '\0';
std::string rval(tmp);
delete[] tmp;
return rval;
}
int main(int argc, char* argv[]) {
zmq::context_t ctx(1);
zmq::socket_t socket(ctx, /*ZMQ_DEALER*/ ZMQ_SUB);
socket.connect("tcp://localhost:5571");
while (true) {
zmq::message_t identity;
zmq::message_t message;
socket.recv(&identity);
socket.recv(&message);
std::string identityStr(GetStringFromMessage(identity));
std::string messageStr(GetStringFromMessage(message));
std::cout << "Identity: " << identityStr << std::endl;
std::cout << "Message: " << messageStr << std::endl;
}
}
发布者不适用于ZMQ_PUB(注释中的有效经销商/路由器选项)
#include <unistd.h>
#include <sstream>
#include <zmq.hpp>
int main (int argc, char* argv[])
{
// Context
zmq::context_t ctx(1);
// Create a socket and set its identity attribute
zmq::socket_t socket(ctx, /*ZMQ_DEALER*/ ZMQ_PUB);
char identity[10] = {};
sprintf(identity, "%d", getpid());
socket.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
socket.connect("tcp://localhost:5570");
// Send some messages
unsigned int counter = 0;
while (true) {
std::ostringstream ss;
ss << "Message #" << counter << " from PID " << getpid();
socket.send(ss.str().c_str(),ss.str().length());
counter++;
sleep(1);
}
return 0;
}
最佳答案
在订阅者代码中,您尚未订阅接收来自发布者的消息。尝试添加以下行:
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
该行之前/之后:
socket.connect("tcp://localhost:5571");
在您的订户代码中