我无法理解我的数据在哪里结束。

我编写了一个测试以确保我的Publisher类成功发送数据,并且如果我绑定到该数据,则可以接收到该数据。

该类本身继承自Thread,并公开一个publish()方法,我可以调用该方法来传递要通过Queue()广播给订户的数据。

但是,在我的测试中,数据永远不会到达。我保证使用相同的
端口,我想不到这里还有什么问题。

我是ZeroMQ的新手,但是我已经设法使PubSub模式可以使用。

测试代码:

# Import Built-ins
import time
import json
import queue
from queue import Queue
from threading import Thread

# Import Third-Party
import zmq


def test_publisher_sends_data(self):
    port = 667
    name, topic, data = 'TestNode', 'testing', ['this', 'is', 'data']
    encoded_name = json.dumps(name).encode('utf-8')
    encoded_topic = json.dumps(topic).encode('utf-8')
    encoded_data = json.dumps(data).encode('utf-8')
    expected_result = (encoded_name, encoded_topic, encoded_data)

    publisher = Publisher(port)
    print("starting publisher")
    publisher.start()

    q = Queue()

    def recv(q):
        ctx = zmq.Context()
        zmq_sock = ctx.socket(zmq.SUB)
        print("Connecting to publisher")
        zmq_sock.connect('tcp://127.0.0.1:%s' % port)
        while True:
            print("waiting for data..")
            q.put(zmq_sock.recv_multipart())
            print("data received!")
    t = Thread(target=recv, args=(q,))
    t.start()

    print("sending data via publisher")
    for i in range(5):
        self.assertTrue(publisher.publish(name, topic, data))
        time.sleep(0.1)
    print("checking q for received data..")
    try:
        result = q.get(block=False)
    except queue.Empty:
        self.fail("Queue was empty, no data received!")
    self.assertEqual(expected_result, result)


Publisher

# Import Built-Ins
import logging
import json
from queue import Queue
from threading import Thread, Event

# Import Third-Party
import zmq


class Publisher(Thread):
    """Publisher Class which allows publishing data to subscribers.

    The publishing is realized with ZMQ Publisher sockets, and supports publishing
    to multiple subscribers.

    The run() method continuosly checks for data on the internal q, which is fed
    by the publish() method.

    """
    def __init__(self, port, *args, **kwargs):
        """Initialize Instance.
        :param port:
        """
        self.port = port
        self._running = Event()
        self.sock = None
        self.q = Queue()
        super(Publisher, self).__init__(*args, **kwargs)

    def publish(self, node_name, topic, data):
        """Publish the given data to all current subscribers.

        All parameters must be json-serializable objects
        :param data:
        :return:
        """
        message_parts = [json.dumps(param).encode('utf-8')
                         for param in (node_name, topic, data)]
        if self.sock:
            self.q.put(message_parts)
            return True
        else:
            return False

    def join(self, timeout=None):
        self._running.clear()
        try:
            self.sock.close()
        except Exception:
            pass
        super(Publisher, self).join(timeout)

    def run(self):
        self._running.set()
        ctx = zmq.Context()
        self.sock = ctx.socket(zmq.PUB)
        self.sock.bind("tcp://*:%s" % self.port)
        while self._running.is_set():
            if not self.q.empty():
                msg_parts = self.q.get(block=False)
                print("Sending data:", msg_parts)
                self.sock.send_multipart(msg_parts)
            else:
                continue
        ctx.destroy()
        self.sock = None

最佳答案

添加.setsockopt( zmq.SUBSCRIBE, someNonZeroLengthSTRING )


记录的默认SUB -socket实例未订阅任何内容

(自然地)
如果任何传入消息与字符串中的任何一个都不匹配,则预订了SUB端,本地.recv()自然不会收到这样的消息。

鉴于您的代码没有明确的订阅,因此没有这样的消息可以满足主题过滤器处理条件Q.E.D.



最好的下一步:

另一个问题-可能会出现“后期连接”问题-如果单元测试设计快速进行,那么我可能建议您进一步(不仅是ZeroMQ)分布式系统设计的最佳下一步是花费Pieter HINTJENS的精彩著作“ Code Connected,第1卷”。认真研究异构分布式系统信令/消息传递的任何人都将享受他在技术和非技术方面的观点和见解。

07-25 22:47
查看更多