试图测试pub / sub的简单实现。我发现如果我离开订户并发送消息,则订户不会全部收到它们。有时会收到全部,有时会收到部分,有时不会收到整套。

运行订阅服务器(使其保持运行状态),然后多次运行发布服务器。

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;


public static void main (String[] args) {

    // Prepare our context and subscriber
    Context context = ZMQ.context(1);
    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("tcp://localhost:5563");
    subscriber.subscribe("B".getBytes());


    System.out.println("Starting Subscriber..");
    int i = 0;
    while (true) {
        String address = subscriber.recvStr();
        String contents = subscriber.recvStr();
        System.out.println(address+":"+new String(contents) + ": "+(i));
        i++;
    }

}


}

发布者:

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;


public class TestPublisher {

public static void main (String[] args) throws Exception {
    Context context = ZMQ.context(1);
    Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("tcp://*:5563");
    System.out.println("Starting Publisher..");
    publisher.setIdentity("B".getBytes());
    publisher.setHWM(1000);
    for (int i = 0; i < 10; i++) {
        Thread.sleep(10l);
        publisher.sendMore("B");
        boolean isSent = publisher.send("We would like to see this:"+i);
        System.out.println("Message was sent "+i+" , "+isSent);
    }

    Thread.sleep(1000);
    publisher.close ();
    context.term ();
}


}

最佳答案

经过一番调试后,发现问题是发布套接字绑定时花了一些时间,尝试发布时只是丢弃了消息。在初始绑定上添加100ms的简单睡眠即可解决此问题。在生产环境中,发布者将在启动时就被绑定。

猜猜这是一个单一的解决方案。现在,具有平均数据量的发布/订阅的所有消息都可以正常工作而不会丢失任何数据。请在下面查看我的发布商的代码段更新。

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;


public class TestPublisher {

    public static void main (String[] args) throws Exception {
        Context context = ZMQ.context(1);
        Socket publisher = context.socket(ZMQ.PUB);

        publisher.bind("tcp://*:5563");
        System.out.println("Starting Publisher..");
        publisher.setIdentity("B".getBytes());
        // for testing setting sleep at 100ms to ensure started.
        Thread.sleep(100l);
        for (int i = 1; i <= 10; i++) {
            publisher.sendMore("B");
            boolean isSent = publisher.send("X("+System.currentTimeMillis()+"):"+i);
            System.out.println("Message was sent "+i+" , "+isSent);
        }

        publisher.close ();
        context.term ();
    }
}

09-30 17:42
查看更多