问题描述
我正尝试使用XPUB和XSUB来实现,如下图所示.我已经仔细阅读了所提供的示例,但在Java中无法获得XPUB和XSUB的示例. 此处他们用C语言给出了一个例子很复杂,因为我是ZeroMQ的新手.
I am trying to implement using XPUB and XSUB as provided in this below figure. I have gone through their examples provided but could not get one for XPUB and XSUB in Java. Here they have given an example in C which is little complex as I am new to ZeroMQ.
我正在尝试使用 jni包装版本在android中使用它.请帮助我找到一个示例,说明如何使用java在ZeroMQ中实现此带代理的Sub-Sub网络.
I am trying to use it in android using jni wrapped version. Please help me to find an example, how to implement this Pub-Sub Network with a Proxy in ZeroMQ using java.
目前,我指的是 http://zguide.zeromq.org/page:all
我尝试将其移植如下. Subscriber.java
I have tried to port it as follows.Subscriber.java
公共类订户扩展了线程实现Runnable {public class Subscriber extends Thread implements Runnable {
private static final String TAG = "Subscriber"; private Context ctx; public Subscriber(ZMQ.Context z_context) { this.ctx = z_context; } @Override public void run() { super.run(); ZMQ.Socket mulServiceSubscriber = ctx.socket(ZMQ.SUB); mulServiceSubscriber.connect("tcp://localhost:6001"); mulServiceSubscriber.subscribe("A".getBytes()); mulServiceSubscriber.subscribe("B".getBytes()); while (true) { Log.d(TAG, "Subscriber loop started.."); String content = new String(mulServiceSubscriber.recv(0)); Log.d(TAG, "Subscriber Received : "+content); } }
}
Publisher.java
公共类Publisher扩展了Thread实现Runnable {Publisher.java
public class Publisher extends Thread implements Runnable {
private static final String TAG = "Publisher"; private Context ctx; public Publisher(ZMQ.Context z_context) { this.ctx = z_context; } @Override public void run() { super.run(); ZMQ.Socket publisher = ctx.socket(ZMQ.PUB); publisher.connect("tcp://localhost:6000"); while (true) { Log.d(TAG, "Publisher loop started.."); publisher.send(("A Hello " + new Random(100).nextInt()).getBytes() , 0); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
}
XListener.java (现在是一个简单的转发器)
XListener.java (For now a simple Forwarder)
公共类XListener扩展线程实现Runnable {public class XListener extends Thread implements Runnable {
private static final String TAG = null; private Socket publisherX; private Context ctx; private Socket subscriberX; public XListener(ZMQ.Context ctx, ZMQ.Socket subscriberX, ZMQ.Socket publisherX) { this.ctx = ctx; this.subscriberX = subscriberX; this.publisherX = publisherX; } @Override public void run() { super.run(); while (true) { Log.d(TAG, "XListener loop started.."); String msg = new String(subscriberX.recvStr()); Log.v(TAG, "Listener Received: " +"MSG :"+msg); publisherX.send(msg.getBytes(), 0); } }
}
在应用程序 main()
private void main(){ ZMQ.Context ctx = ZMQ.context(1);private void main() { ZMQ.Context ctx = ZMQ.context(1);使用该代码,我无法收听 XSUB .移植 espresso.c 时,我无法在ZMQ的Java绑定中找到任何包装.如何实现简单的代理,或者我缺少什么??
With the code I am not able to listen XSUB. While porting espresso.c, I was not able to find any wrapper in java bindings of ZMQ. How to implement a simple proxy or am I missing something??
推荐答案
哇,我在回答自己的问题.我错过了将转发器从 publisherX 添加到 subscriberX 的过程.这是缺少的代码.现在,XSUB和XPUB能够发送和获取数据.
Wow I'm answering my own question. I missed to add a forwarder from publisherX to subscriberX. Here is the missing code. Now XSUB and XPUB are able to send and get data.
公共类XSender扩展了Thread实现Runnable {public class XSender extends Thread implements Runnable {
private static final String TAG = null; private Socket publisherX; private Context ctx; private Socket subscriberX; public XSender(ZMQ.Context ctx, ZMQ.Socket subscriberX, ZMQ.Socket publisherX) { this.ctx = ctx; this.subscriberX = subscriberX; this.publisherX = publisherX; } @Override public void run() { super.run(); while (true) { // Read envelope with address Log.d(TAG, "XListener loop started.."); String msg = new String(subscriberX.recv(0)); Log.v(TAG, "Listener Received: " +"MSG :"+msg); publisherX.send(msg.getBytes(), 0); } }
}
这篇关于如何在ZeroMQ(jzmq)3.xx中使用XPUB和XSUB通过代理实现Pub-Sub网络的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!