本文介绍了ZMQ:多个订阅者的 XPUB 套接字上没有订阅消息(最后值缓存模式)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我实现了 ZMQ 的最后一个值缓存 (LVC) 示例(http://zguide.zeromq.org/php:chapter5#Last-Value-Caching),但无法让第二个订阅者在后端注册.

I implemented the Last Value Caching (LVC) example of ZMQ (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching), but can't get a 2nd subscriber to register at the backend.

订阅者第一次加入时,满足 event[0] == b'\x01' 条件并发送缓存值,但第二个订阅者(同一主题)不甚至不注册(if backend in events: 永远不会为真).其他一切正常.数据从发布者传递给订阅者(所有).

The first time a subscriber comes on board, the event[0] == b'\x01' condition is met and the cached value is sent, but the second subscriber (same topic) doesn't even register (if backend in events: is never true). Everything else works fine. Data gets passed from the publisher to the subscribers (all).

这可能是什么原因?后端的连接方式是否正确?这种模式是否只适用于第一个订阅者?

What could be the reason for this? Is the way the backend is connected correct? Is this pattern only supposed to work with the first subscriber?

更新

当我为第二个订阅者订阅另一个主题时,我得到了正确的行为(即订阅时\x01).这似乎真的适用于第一个订阅者 onlt .是 ZeroMQ 中的错误吗?

When I subscribe the 2nd subscriber to another topic, I get the right behaviour (i.e. \x01 when subscribing). This really seems to work for the first subscriber onlt . Is is a bug in ZeroMQ?

更新 2

这是一个最小的工作示例,表明 LVC 模式不起作用(至少不是这里实现的方式).

Here's a minimal working example that shows that the LVC pattern is not working (at least not the way it's implemented here).

# subscriber.py
import zmq

def main():
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.connect("tcp://127.0.0.1:5558")

    # Subscribe to every single topic from publisher
    print 'subscribing (sub side)'
    sub.setsockopt(zmq.SUBSCRIBE, b"my-topic")

    poller = zmq.Poller()
    poller.register(sub, zmq.POLLIN)
    while True:
        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if sub in events:
            msg = sub.recv_multipart()
            topic, current = msg
            print 'received %s on topic %s' % (current, topic)

if __name__ == '__main__':
    main()

这是代理(如示例中所示,但有更多的冗长和集成发布者).

And here's the broker (as in the example, but with a bit more verbosity and an integrated publisher).

# broker.py
# from http://zguide.zeromq.org/py:lvcache
import zmq
import threading
import time


class Publisher(threading.Thread):
    def __init__(self):
        super(Publisher, self).__init__()

    def run(self):
        time.sleep(10)
        ctx = zmq.Context.instance()
        pub = ctx.socket(zmq.PUB)
        pub.connect("tcp://127.0.0.1:5557")

        cnt = 0
        while True:
            msg = 'hello %d' % cnt
            print 'publisher is publishing %s' % msg
            pub.send_multipart(['my-topic', msg])
            cnt += 1
            time.sleep(5)


def main():
    ctx = zmq.Context.instance()
    frontend = ctx.socket(zmq.SUB)
    frontend.bind("tcp://*:5557")
    backend = ctx.socket(zmq.XPUB)
    backend.bind("tcp://*:5558")

    # Subscribe to every single topic from publisher
    frontend.setsockopt(zmq.SUBSCRIBE, b"")

    # Store last instance of each topic in a cache
    cache = {}

    # We route topic updates from frontend to backend, and
    # we handle subscriptions by sending whatever we cached,
    # if anything:
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)


    # launch a publisher
    p = Publisher()
    p.daemon = True
    p.start()

    while True:

        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if frontend in events:
            msg = frontend.recv_multipart()
            topic, current = msg
            cache[topic] = current
            backend.send_multipart(msg)

        ### this is where it fails for the 2nd subscriber.
        ### There's never even an event from the backend
        ### in events when the 2nd subscriber is subscribing.

        # When we get a new subscription we pull data from the cache:
        if backend in events:
            print 'message from subscriber'
            event = backend.recv()
            # Event is one byte 0=unsub or 1=sub, followed by topic
            if event[0] == b'\x01':
                topic = event[1:]
                print ' => subscribe to %s' % topic
                if topic in cache:
                    print ("Sending cached topic %s" % topic)
                    backend.send_multipart([ topic, cache[topic] ])
            elif event[0] == b'\x00':
                topic = event[1:]
                print ' => unsubscribe from %s' % topic

if __name__ == '__main__':
    main()

运行此代码 (1 x broker.py, 2 x subscriber.py) 显示第一个订阅者按预期在代理注册 (\x01 和缓存查找),但第二个订阅者不会以相同的方式注册.有趣的是,第二个订阅者连接到 pub/sub 频道,因为一段时间(10 秒)后,两个订阅者都从发布者接收数据.

Running this code (1 x broker.py, 2 x subscriber.py) shows that the first subscriber registers at the broker as expected (\x01 and cache lookup), but the 2nd subscriber does not get registered the same way. Interestingly, the 2nd subscriber is hooked up to the pub/sub channel, as after a while (10 sec) both subscribers receive data from the publisher.

这很奇怪.也许我的一些图书馆已经过时了.这是我得到的:

This is very strange. Perhaps some of my libraries are outdated. Here's what I got:

Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> zmq.__version__
'14.1.1'

$ brew info zeromq
zeromq: stable 4.0.5 (bottled), HEAD
High-performance, asynchronous messaging library
http://www.zeromq.org/
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) *
  Poured from bottle
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb
==> Dependencies
Build: pkg-config ✔
Optional: libpgm ✘, libsodium ✘

更新 3

这种行为也可以在 zeromq 4.1.2pyzmq-14.7.0(安装或不安装 libpgm 和 libsodium)中观察到.

This behaviour can also be observed in zeromq 4.1.2 and pyzmq-14.7.0 (with or without libpgm and libsodium installed).

更新 4

另一个观察表明第一个订阅者的处理方式有所不同:第一个订阅者是唯一一个以预期方式从 XPUB 套接字(backend)取消订阅的订阅者其订阅主题为 \x00.其他订阅者(我尝试了 2 个以上)在后端频道上保持静音(尽管接收消息).

Another observation suggests that the first subscriber is somehow handled differently: The first subscriber is the only one unsubscribing in the expected way from the XPUB socket (backend) by preceding its subscription topic with \x00. The other subscribers (I tried more than 2) stayed mute on the backend channel (although receiving messages).

更新 5

我希望我不会陷入困境,但我已经研究了 czmq 绑定并在 C 中运行了我的 Python 示例.结果是一样的,所以我猜不是绑定有问题,但是 libzmq.

I hope I'm not going down a rabbit hole, but I've looked into the czmq bindings and ran my Python example in C. The results are the same, so I guess it's not a problem with the bindings, but with libzmq.

我还验证了第二个订阅者正在发送订阅消息,我确实可以在网上看到这一点:

I also verified that the 2nd subscriber is sending a subscribe message and indeed I can see this on the wire:

首次订阅:

0000  02 00 00 00 45 00 00 3f  98 be 40 00 40 06 00 00   ....E..? ..@.@...
0010  7f 00 00 01 7f 00 00 01  fa e5 15 b6 34 f0 51 c3   ........ ....4.Q.
0020  05 e4 8b 77 80 18 31 d4  fe 33 00 00 01 01 08 0a   ...w..1. .3......
0030  2a aa d1 d2 2a aa cd e9  00 09 01 6d 79 2d 74 6f   *...*... ...my-to
0040  70 69 63                                           pic

标记并解释了差异的第二条订阅消息(与上面不同).在订阅帧中发送相同的数据.

2nd subscribe message with difference (to above) marked and explained. The same data is sent in the subscribe frame.

                               identification
                               v
0000  02 00 00 00 45 00 00 3f  ed be 40 00 40 06 00 00   ....E..? ..@.@...
                             src port      sequence number
                                  v        v  v  v  v
0010  7f 00 00 01 7f 00 00 01  fa e6 15 b6 17 da 02 e7   ........ ........

Acknowledgement number   window scaling factor
      v  v  v  v           v
0020  71 4b 33 e6 80 18 31 d5  fe 33 00 00 01 01 08 0a   qK3...1. .3......

timestamp value  timestamp echo reply
            v           v  v   |<-------- data -------
0030  2a aa f8 2c 2a aa f4 45  00 09 01 6d 79 2d 74 6f   *..,*..E ...my-to

      ------>|
0040  70 69 63                                           pic

推荐答案

我找到了这个问题的解决方案,即使我从头到尾阅读了文档,我也没有看到它.关键是XPUB_VERBOSE.将此行添加到后端初始化之后,一切正常

I found the solution for this problem, and even though I read the docs front to back and back to front, I had not seen it. The key is XPUB_VERBOSE. Add this line to after the backend initialisation and everything works fine

backend.setsockopt(zmq.XPUB_VERBOSE, True)

以下是官方文档的摘录:

ZMQ_XPUB_VERBOSE:在 XPUB 套接字上提供所有订阅消息设置新订阅的 XPUB 套接字行为和退订.值为 0 是默认值并且仅传递新的订阅消息到上游.值为 1 通过所有上游订阅消息.

选项值类型 int 选项值单位 0, 1 默认值 0适用的socket类型ZMQ_XPUB

Option value type int Option value unit 0, 1 Default value 0 Applicable socket types ZMQ_XPUB

Pieter Hintjens 在他的博客中有更多关于此的信息.这是相关部分:

Pieter Hintjens has some more information on this in his blog. This is the relevant section:

几个月前,我们添加了一个简洁的小选项 (ZMQ_XPUB_VERBOSE) 到XPUB 套接字禁用重复订阅的过滤.这现在适用于任意数量的订阅者.我们使用它如下:

void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_set_xpub_verbose (publisher, 1);
zsocket_bind (publisher, "tcp://*:6001");

应该更新 LVC 模式描述以反映此设置,否则此模式将无法工作.

The LVC pattern description should be updated to reflect this setting, as this pattern won't work otherwise.

这篇关于ZMQ:多个订阅者的 XPUB 套接字上没有订阅消息(最后值缓存模式)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-07 03:00