我正在尝试编写一个处理程序类,该处理程序类订阅通过zeromq发布的消息并缓冲最后收到的消息。

我尝试这样做如下。包装器应用程序将在循环调用函数中调用ReceivedMessage()方法。返回true后,我尝试使用GetReceivedMessageData()访问消息。不幸的是,似乎数据没有正确保存在成员zmq_receivedMessage_中。

我猜这是因为zmq_receivedMessage_初始化为固定大小,并且调用zmq_subscriber_.recv(&zmq_receivedMessage_)不会自动调整其大小吗?

最简单,最可靠的方法是什么?我能想到的唯一方法是每次收到新消息时都使用realloc()memcpy()。还是有更简单的方法?

#include <cstdint>
#include "zeromq_cpp/zmq.hpp"

class HandlerClass
{
public:
    /// @brief Initializes a AirSimToRos class instance.
    HandlerClass(std::string const& addr);

    // @brief Gets the message data received via ZeroMq as pointer.
    void* GetReceivedMessageData();

    // @brief Gets the message size received via ZeroMq as size_t.
    std::size_t GetReceivedMessageSize();

    // @brief Returns true if a new, full message was received via ZeroMq, false otherwise
    bool ReceivedMessage();

private:
    /// @brief A ZeroMq context object encapsulating functionality dealing with the initialisation and termination.
    zmq::context_t zmq_context_;

    /// @brief A ZeroMq socket for subscribing to incoming messages.
    zmq::socket_t zmq_subscriber_;

    /// @brief A ZeroMq message that was received last. Might be empty if ReceivedMessage() never was true.
    zmq::message_t zmq_receivedMessage_;

};

HandlerClass::HandlerClass(std::string const& addr)
    : zmq_context_(1)
    , zmq_subscriber_(zmq_context_, ZMQ_SUB)
{
    zmq_subscriber_.setsockopt(ZMQ_IDENTITY, "HandlerSubscriber", 5);
    zmq_subscriber_.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    zmq_subscriber_.setsockopt(ZMQ_RCVTIMEO, 5000);
    zmq_subscriber_.connect(addr);
}

void* HandlerClass::GetReceivedMessageData()
{
    return zmq_receivedMessage_.data();
}

std::size_t HandlerClass::GetReceivedMessageSize()
{
    return zmq_receivedMessage_.size();
}

bool HandlerClass::ReceivedMessage()
{
    int received_bytes = zmq_subscriber_.recv(&zmq_receivedMessage_);

    return received_bytes > 0;
}

最佳答案

一种方法是使用 Poller -instance + ZMQ_CONFLATE重新设计

由于预期的类用例的上下文为零,因此原始设计似乎是数据移动器的“机械”包装,而不是任何MVP精简设计,可以最大限度地利用ZeroMQ可扩展形式通信原型(prototype)的优势信令/消息传递框架已经内置。

更加聪明(并且还有ZMQ_RCV_HWM -safer(超出本主题的范围))不是总是总是机械地从ZeroMQ Context-控制域中读取每条消息,除非真正需要重新传输此类数据从HandlerClass后面的某个地方开始。

添加 private Poller实例,该实例将允许重新设计数据流机制-使用非破坏性查询,使用.poll()-方法测试新消息的到达(还具有实时/事件处理循环稳定性)控制工具,不要等待超过临时设置的.poll() -method timeout),同时能够推迟任何实际的数据移动,直到数据确实需要流到HandlerClass -instance之外,而不是在任何地方较早。

HandlerClass::HandlerClass(std::string const& addr)
    : zmq_context_(1)
    , zmq_subscriber_(zmq_context_, ZMQ_SUB)
{
    zmq_subscriber_.setsockopt( ZMQ_IDENTITY,   "HandlerSubscriber", 5 );
    zmq_subscriber_.connect(                     addr );
    zmq_subscriber_.setsockopt( ZMQ_SUBSCRIBE,  "", 0 );
    zmq_subscriber_.setsockopt( ZMQ_LINGER,      0 );  // ALWAYS, READY 4 .term()
    zmq_subscriber_.setsockopt( ZMQ_CONFLATE,    1 );  // SMART
    zmq_subscriber_.setsockopt( ZMQ_TOS,         T );  // WORTH DEPLOY & MANAGE
    zmq_subscriber_.setsockopt( ZMQ_RCVTIMEO, 5000 );
 // -------------------------------------------------  // ADD Poller-instance
    ...
 // -------------------------------------------------  // RTO
}

Nota Bene:如果在ZeroMQ基础结构上也进行了扩展,那么可以使用省时的API工具将零复制消息重新编码到另一个ZeroMQ套接字传输中(几乎免费)-很酷,不是吗

关于c++ - 缓冲区最后收到的ZeroMQ消息作为类成员,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46120113/

10-11 23:09
查看更多