我正在使用czmq进行进程间通信。
有两个过程:
服务器,接收请求并发送答复,但也发送事件。
客户端发送请求并接收回复,但也监听事件。
我已经使用REQ / REP成功实现了“请求/答复”模式(以下详细信息)
现在,我要实现通知机制。
我希望我的服务器发送事件而不关心是否有人收到事件,也不希望被阻塞。
客户端侦听这些事件,但是如果崩溃,则对服务器不会有任何影响。
我相信PUB / SUB是最合适的模式,但是如果没有,请随时给我启发。
这是我的实现(从检查和日志中清除):
服务器发布事件
Server::eventIpcPublisher = zsock_new_pub("@ipc:///tmp/events.ipc");
void Server::OnEvent(uint8_t8* eventData, size_t dataSize) {
if (Server::eventIpcPublisher != nullptr) {
int retCode = zsock_send(Server::eventIpcPublisher, "b", eventData, dataSize);
}
客户端在专用线程中侦听它们
void Client::RegisterToEvents(const std::function<void(uint8_t*, size_t)>& callback) {
zsock_t* eventIpcSubscriber = zsock_new_sub(">ipc:///tmp/events.ipc", "");
listening = true;
while (listening) {
byte* receptionBuffer;
size_t receptionBufferSize;
int retCode = zsock_recv(eventIpcSubscriber, "b", &receptionBuffer, &receptionBufferSize);
--> NEVER REACHED <--
if (retCode == 0) {
callback(static_cast<uint8_t*>(receptionBuffer), receptionBufferSize);
}
}
zsock_destroy(&eventIpcSubscriber);
}
它不起作用:
服务器发送的返回码为0,就好像一切正常,
客户端什么都没有收到(在接收时被阻止)。
帮助将不胜感激,在此先感谢!
克里斯。
PS:这是我已经成功实施的REQ / REP(这里不需要帮助,只是为了理解)
客户端发送请求,然后等待答案。
uint8_t* MulticamApi::GetDatabase(size_t& sizeOfData) {
zsock_t* requestSocket = zsock_new_req(">ipc:///tmp/requests.ipc");
if (requestSocket == nullptr)
return nullptr;
byte* receptionBuffer;
size_t receptionBufferSize;
int retCode = zsock_send(requestSocket, "i", static_cast<int>(IpcComm_GetClipDbRequest));
if (retCode != 0) {
sizeOfData = 0;
return nullptr;
}
retCode = zsock_recv(requestSocket, "b", &receptionBuffer, &receptionBufferSize);
databaseData.reset(new MallocGuard(static_cast<void*>(receptionBuffer)));
sizeOfData = receptionBufferSize;
return static_cast<uint8_t*>(databaseData->Data());
}
服务器中的专用线程侦听请求,处理请求并进行回复。 (不用担心,删除是在其他地方处理的)
U32 Server::V_OnProcessing(U32 waitCode) {
protocolIpcWriter = zsock_new_rep("@ipc:///tmp/requests.ipc");
while (running) {
int receptionInt = 0;
int retCode = zsock_recv(protocolIpcWriter, "i", &receptionInt);
if ((retCode == 0) && (receptionInt == static_cast<int>(IpcComm_GetClipDbRequest))) {
GetDatabase();
}
sleep(1);
}
zsock_destroy(&protocolIpcWriter);
return 0;
}
void Server::GetDatabase() {
uint32_t dataSize = 10820 * 340;
uint8_t* data = new uint8_t[dataSize];
uint32_t nbBytesWritten = DbHelper::SaveDbToBuffer(data, dataSize);
int retCode = zsock_send(protocolIpcWriter, "b", data, nbBytesWritten);
}
最佳答案
我知道我的问题已经很老了,但为了记录起见,我从czmq
切换为基本zmq
api,一切顺利。我的一位同事在czmq
层上也遇到了问题,因此改用zmq
进行修复,因此绝对是我的建议。
关于c++ - czmq异步发送/接收(PUB/SUB),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35752517/