我正在使用czmq进行进程间通信。

有两个过程:


服务器,接收请求并发送答复,但也发送事件。
客户端发送请求并接收回复,但也监听事件。


我已经使用REQ / REP成功实现了“请求/答复”模式(以下详细信息)

现在,我要实现通知机制。
我希望我的服务器发送事件而不关心是否有人收到事件,也不希望被阻塞。
客户端侦听这些事件,但是如果崩溃,则对服务器不会有任何影响。
我相信PUB / SUB是最合适的模式,但是如果没有,请随时给我启发。

这是我的实现(从检查和日志中清除):

服务器发布事件

Server::eventIpcPublisher = zsock_new_pub("@ipc:///tmp/events.ipc");

void Server::OnEvent(uint8_t8* eventData, size_t dataSize) {
  if (Server::eventIpcPublisher != nullptr) {
    int retCode = zsock_send(Server::eventIpcPublisher, "b", eventData, dataSize);
}


客户端在专用线程中侦听它们

void Client::RegisterToEvents(const std::function<void(uint8_t*, size_t)>& callback) {
  zsock_t* eventIpcSubscriber = zsock_new_sub(">ipc:///tmp/events.ipc", "");
  listening = true;
  while (listening) {
    byte* receptionBuffer;
    size_t receptionBufferSize;
    int retCode = zsock_recv(eventIpcSubscriber, "b", &receptionBuffer, &receptionBufferSize);
    --> NEVER REACHED <--
    if (retCode == 0) {
      callback(static_cast<uint8_t*>(receptionBuffer), receptionBufferSize);
    }
  }
  zsock_destroy(&eventIpcSubscriber);
}


它不起作用:


服务器发送的返回码为0,就好像一切正​​常,
客户端什么都没有收到(在接收时被阻止)。


帮助将不胜感激,在此先感谢!

克里斯。

PS:这是我已经成功实施的REQ / REP(这里不需要帮助,只是为了理解)

客户端发送请求,然后等待答案。

uint8_t* MulticamApi::GetDatabase(size_t& sizeOfData) {
  zsock_t* requestSocket = zsock_new_req(">ipc:///tmp/requests.ipc");
  if (requestSocket == nullptr)
    return nullptr;
  byte* receptionBuffer;
  size_t receptionBufferSize;
  int retCode = zsock_send(requestSocket, "i", static_cast<int>(IpcComm_GetClipDbRequest));
  if (retCode != 0) {
    sizeOfData = 0;
    return nullptr;
  }
  retCode = zsock_recv(requestSocket, "b", &receptionBuffer, &receptionBufferSize);
  databaseData.reset(new MallocGuard(static_cast<void*>(receptionBuffer)));
  sizeOfData = receptionBufferSize;
  return static_cast<uint8_t*>(databaseData->Data());
}


服务器中的专用线程侦听请求,处理请求并进行回复。 (不用担心,删除是在其他地方处理的)

U32 Server::V_OnProcessing(U32 waitCode) {
  protocolIpcWriter = zsock_new_rep("@ipc:///tmp/requests.ipc");
  while (running) {
    int receptionInt = 0;
    int retCode = zsock_recv(protocolIpcWriter, "i", &receptionInt);
    if ((retCode == 0) && (receptionInt == static_cast<int>(IpcComm_GetClipDbRequest))) {
      GetDatabase();
    }
    sleep(1);
  }
  zsock_destroy(&protocolIpcWriter);
  return 0;
}

void Server::GetDatabase() {
  uint32_t dataSize = 10820 * 340;
  uint8_t* data = new uint8_t[dataSize];
  uint32_t nbBytesWritten = DbHelper::SaveDbToBuffer(data, dataSize);
  int retCode = zsock_send(protocolIpcWriter, "b", data, nbBytesWritten);
}

最佳答案

我知道我的问题已经很老了,但为了记录起见,我从czmq切换为基本zmq api,一切顺利。我的一位同事在czmq层上也遇到了问题,因此改用zmq进行修复,因此绝对是我的建议。

关于c++ - czmq异步发送/接收(PUB/SUB),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35752517/

10-11 23:03
查看更多