本文介绍了ZeroMQ订阅服务器未通过inproc:传输类接收到发布者的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 pyzmq 很陌生。我试图理解 inproc: 运输类,并创建了此示例示例以供使用。

I am fairly new to pyzmq. I am trying to understand inproc: transport class and have created this sample example to play with.

看起来 Publisher 实例正在发布消息,但 Subscriber 实例接收任何实例。

It looks a Publisher instance is publishing messages but Subscriber instances are not receiving any.

如果我移动了 Subscriber 实例进入单独的 进程 ,并将 inproc:更改为 tcp: 运输类,该示例有效。

In case I move Subscriber instances into a separate process and change inproc: to a tcp: transport class, the example works.

这里是代码:

import threading
import time

import zmq

context = zmq.Context.instance()

address = 'inproc://test'


class Publisher(threading.Thread):
    def __init__(self):
        self.socket = context.socket(zmq.PUB)

        self.socket.bind(address)

    def run(self):
        while True:
            message = 'snapshot,current_time_%s' % str(time.time())
            print 'sending message %s' % message
            self.socket.send(message)
            time.sleep(1)


class Subscriber(object):
    def __init__(self, sub_name):
        self.name = sub_name
        self.socket = context.socket(zmq.SUB)
        self.socket.connect(address)

    def listen(self):
        while True:
            try:
                msg = self.socket.recv()
                a, b = msg.split(' ', 1)
                print 'Received message -> %s-%s-%s' % (self.name, a, b)
            except zmq.ZMQError as e:
                logger.exception(e)


if __name__ == '__main__':
    thread_a = []
    for i in range(0, 1):
        subs = Subscriber('subscriber_%s' % str(i))
        th = threading.Thread(target=subs.listen)
        thread_a.append(th)
        th.start()

    pub = Publisher()
    pub_th = threading.Thread(target=pub.run)

    pub_th.start()


推荐答案

没错,但是



ZeroMQ是一个很棒的工具箱。

它充斥着各种聪明,聪明和自适应的服务,可以从许多方面切实地挽救我们的贫困生活。

仍然值得阅读并遵守文档中的一些规则。

There is nothing wrong, but

ZeroMQ is a wonderfull toolbox.

It is full of smart, bright and self-adapting services under the hood, that literally save our poor lives in many ways.

Still it is worth to read and obey a few rules from the documentation.

inproc 运输类就是这样的。 .bind() 首先,在 .connect() 之前> -s

inproc transport class has one such. .bind() first, before .connect()-s

因此,例如:

if __name__ == '__main__':

    pub    = Publisher()
    pub_th = threading.Thread( target = pub.run )
    pub_th.start()
    # give it a place to start before .connect()-s may take place
    # give it a time  to start before .connect()-s may take place
    sleep(0.5)

    thread_a = []
    for i in range( 0, 1 ):
        subs = Subscriber( 'subscriber_%s' % str( i ) )
        th   = threading.Thread( target = subs.listen )
        thread_a.append( th )
        th.start()

这篇关于ZeroMQ订阅服务器未通过inproc:传输类接收到发布者的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

05-18 21:30