试图测试pub / sub的简单实现。我发现如果我离开订户并发送消息,则订户不会全部收到它们。有时会收到全部,有时会收到部分,有时不会收到整套。
运行订阅服务器(使其保持运行状态),然后多次运行发布服务器。
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public static void main (String[] args) {
// Prepare our context and subscriber
Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.subscribe("B".getBytes());
System.out.println("Starting Subscriber..");
int i = 0;
while (true) {
String address = subscriber.recvStr();
String contents = subscriber.recvStr();
System.out.println(address+":"+new String(contents) + ": "+(i));
i++;
}
}
}
发布者:
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class TestPublisher {
public static void main (String[] args) throws Exception {
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5563");
System.out.println("Starting Publisher..");
publisher.setIdentity("B".getBytes());
publisher.setHWM(1000);
for (int i = 0; i < 10; i++) {
Thread.sleep(10l);
publisher.sendMore("B");
boolean isSent = publisher.send("We would like to see this:"+i);
System.out.println("Message was sent "+i+" , "+isSent);
}
Thread.sleep(1000);
publisher.close ();
context.term ();
}
}
最佳答案
经过一番调试后,发现问题是发布套接字绑定时花了一些时间,尝试发布时只是丢弃了消息。在初始绑定上添加100ms的简单睡眠即可解决此问题。在生产环境中,发布者将在启动时就被绑定。
猜猜这是一个单一的解决方案。现在,具有平均数据量的发布/订阅的所有消息都可以正常工作而不会丢失任何数据。请在下面查看我的发布商的代码段更新。
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class TestPublisher {
public static void main (String[] args) throws Exception {
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5563");
System.out.println("Starting Publisher..");
publisher.setIdentity("B".getBytes());
// for testing setting sleep at 100ms to ensure started.
Thread.sleep(100l);
for (int i = 1; i <= 10; i++) {
publisher.sendMore("B");
boolean isSent = publisher.send("X("+System.currentTimeMillis()+"):"+i);
System.out.println("Message was sent "+i+" , "+isSent);
}
publisher.close ();
context.term ();
}
}