多发布者和单订阅者——观察到数据丢失

多发布者和单订阅者——观察到数据丢失

本文介绍了ZMQ 多发布者和单订阅者——观察到数据丢失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了 2 个连接到同一个静态位置的发布者.

I have created 2 Publishers connecting to the same static location .

发布者 1

dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {

}
void dummyFrontEnd::Init()
{
    socket.connect("tcp://127.0.0.1:5555");
    cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendTwoTables()
{

   cout << "In SendTwoTables" <<endl;

   while(1) {
   canlogreq canLogObj = canlogreq::default_instance();
   canLogObj.set_fromhours(11);
   canLogObj.set_fromminutes(7);
   canLogObj.set_fromseconds(2);
   canLogObj.set_fromday(16);
   canLogObj.set_frommonth(5);
   canLogObj.set_fromyear(2020);
   canLogObj.set_tohours(12);
   canLogObj.set_tominutes(7);
   canLogObj.set_toseconds(4);
   canLogObj.set_today(17);
   canLogObj.set_tomonth(5);
   canLogObj.set_toyear(2020);

   zmq::message_t logsnippetmsg(canLogObj.ByteSizeLong() + sizeof(uint16_t));
    *((uint16_t*)logsnippetmsg.data()) = 20;

  canLogObj.SerializeToArray(logsnippetmsg.data()+sizeof(uint16_t), canLogObj.ByteSizeLong());

   socket.send(logsnippetmsg);
                  usleep(1);


   canLogObj.clear_fromhours();
   canLogObj.clear_fromminutes();
   canLogObj.clear_fromseconds();
   canLogObj.clear_fromday();
   canLogObj.clear_frommonth();
   canLogObj.clear_fromyear();
   canLogObj.clear_tohours();
   canLogObj.clear_tominutes();
   canLogObj.clear_toseconds();
   canLogObj.clear_today();
   canLogObj.clear_tomonth();
   canLogObj.clear_toyear();

}

}

发布者 2:

dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {

}
void dummyFrontEnd::Init()
{
    socket.connect("tcp://127.0.0.1:5555");
    cout << "Connecting .... " << endl;
}
void dummyFrontEnd:: SendData() {

while (std::getline(file, line_str)) {
        std::stringstream ss(line_str);
        double tdiff;
        int i;
        char J;
        int _1939;
        int pgn;
        char p;
        int priority;
        char _0;
        int source;
        char dash;
        std::string direction;
        char d;
        int length;
        int data[8];

        ss >> tdiff >> i >> J >> _1939 >> pgn >> p >> priority >> _0 >> source
           >> dash >> direction >> d >> length >> data[0] >> data[1] >> data[2]
           >> data[3] >> data[4] >> data[5] >> data[6] >> data[7];

        timestamp += tdiff;

        while (gcl_get_time_ms() - start_time <
                uint64_t(timestamp * 1000.0) - first_time) { usleep(1); }

        if (arguments.verbose) {
            std::cout << timestamp << " " << i << " " << J << " " << _1939 << " "
                << pgn << " " << p << " " << priority << " " << _0 << " " << source
                << " " << dash << " " << direction << " " << d << " " << length
                << " " << data[0] << " " << data[1] << " " << data[2] << " "
                << data[3] << " " << data[4] << " " << data[5] << " " << data[6]
                << " " << data[7] << std::endl;
        }

        uint64_t timestamp_ms = (uint64_t)(timestamp * 1000.0);

        protoTable.add_columnvalues(uint64ToString(timestamp_ms) /* timestamp */);
        protoTable.add_columnvalues(intToString(pgn) /* PGN */);
        protoTable.add_columnvalues(intToString(priority) /* Priority */);
        protoTable.add_columnvalues(intToString(source) /* Source */);
        protoTable.add_columnvalues(direction /* Direction */);
        protoTable.add_columnvalues(intToString(length) /* Length */);
        protoTable.add_columnvalues(intToString(data[0]) /* data1 */);
        protoTable.add_columnvalues(intToString(data[1]) /* data2 */);
        protoTable.add_columnvalues(intToString(data[2]) /* data3 */);
        protoTable.add_columnvalues(intToString(data[3]) /* data4 */);
        protoTable.add_columnvalues(intToString(data[4]) /* data5 */);
        protoTable.add_columnvalues(intToString(data[5]) /* data6 */);
        protoTable.add_columnvalues(intToString(data[6]) /* data7 */);
        protoTable.add_columnvalues(intToString(data[7]) /* data8 */);

    zmq::message_t create_values(protoTable.ByteSizeLong()+sizeof(uint16_t));
        *((uint16_t*)create_values.data()) = TABLEMSG_ID;  // ID
        protoTable.SerializeToArray(create_values.data()+sizeof(uint16_t), protoTable.ByteSizeLong());

        socket.send(create_values);


        protoTable.clear_columnvalues();
        usleep(1);
    }

}

订阅者

TransportLayer::TransportLayer():context(1),socket(context,ZMQ_SUB){ }
void TransportLayer::Init()
{
    socket.bind("tcp://*:5555");
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
}
void TransportLayer::Receive()
{
    cout <<"TransportLayer::Receive " << " I m in server " << endl;

    static int count = 1 ;
    // Producer thread.
    while ( true ){

    zmq::message_t request;
    string protoBuf;

    socket.recv(&request);

    uint16_t id = *((uint16_t*)request.data());

    cout <<"TransportLayer : "<<"request.data:  "<< request.data() << endl;
    cout << "TransportLayer: count " << count<< endl; count = count+1 ;
    cout <<"TransportLayer : request.data.size "<< request.size() << endl;


    protoBuf = std::string(static_cast<char*>(request.data()+sizeof(uint16_t)), request.size()-sizeof(uint16_t));
    cout << "ProtoBuf : " << protoBuf << endl;

     InterfaceLayer *interfaceLayObj = InterfaceLayer::getInstance();

     switch(id) {
        case TABLEMSG_ID: cout << "Canlyser" << endl;
      interfaceLayObj->ParseProtoBufTable(protoBuf);
            break;
        case LOGSNIPPET_ID:cout << "LogSnip" << endl;
             interfaceLayObj->ParseProtoBufLogSnippet(protoBuf);
             interfaceLayObj->logsnippetSignal(); // publish the signal
            break;
        default:
            break;
    }


    usleep(1);

    }

}

观察:一世)执行顺序.1. 开始订阅者2.启动Publisher1(它只发送一个数据值)

Observation :I)Execution Order .1. Started Subscriber2. Started Publisher1 ( it sent only one data value)

订阅者未收到此数据.

II) 修改 Publisher1 以在 while 循环中发送相同的数据执行顺序1. 开始订阅者2. 启动 Publisher13. 启动了 Publsiher2 .

II) modified Publisher1 to send the same data in a while loopExecution Order1. Started Subscriber2. Started Publisher13. Started Publsiher2 .

现在我看到 Subscriber 正在接收来自两个发布者的数据.

Now I see that Subscriber is receiving the data from both publishers .

这表明我可能会丢失数据.

This gives me an indication that there is a possibility for data loss.

我如何确保绝对没有数据丢失.

How do I ensure there is absolutely no data loss.

谢谢

推荐答案

您的生产者可能会在订阅握手完成之前发送一条消息.结果,发布套接字丢弃消息,因为没有注册订阅.

Your producer may be sending its one message before the subscription handshaking has completed. As a result, the publish socket discards the message because there is no subscription registered.

如果您使用 XPUB 套接字 (ZMQ_XPUB -- 请参阅 http://api.zeromq.org/4-2:zmq-socket) 而不是 PUB 套接字,您的程序可以等待订阅消息,以便在发送之前知道有人正在监听消息.

If you use an XPUB socket (ZMQ_XPUB -- see http://api.zeromq.org/4-2:zmq-socket) instead of a PUB socket, your program can wait for a subscribe message, so that it knows that someone is listening, before sending its message(s).

这篇关于ZMQ 多发布者和单订阅者——观察到数据丢失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 04:37