我正在将pyzmq库与 pub/sub 模式一起使用。我通过.connect()方法给一些快速的ZMQ发布者,通过.bind()方法给ozt_strong一个较慢的ZMQ用户。
然后,几分钟后,我的订阅者从发布者那里获取了旧的发布数据(由于ZMQ缓冲区而导致了)。

我的问题:
有什么方法可以管理 ZMQ队列缓冲区大小? (设置有限的缓冲区)
[注意]:

  • 我不想使用ZMQ PUSH/PULL。
  • 我已经阅读了这篇文章,但是这种方法仅清除缓冲区:clear ZMQ buffer
  • 我也尝试了high watermark选项,但是没有用:



  • 发布者:
    import zmq
    import time
    
    port = "5556"
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    socket.setsockopt(zmq.SNDHWM, 10)  # not working
    
    while True:
        data = time.time()
        print("%d" % data)
        socket.send("%d" % data)
        time.sleep(1)
    
    订阅者:
    import zmq
    import time
    
    port = "5556"
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:%s" % port)
    socket.setsockopt(zmq.SUBSCRIBE, '')
    socket.setsockopt(zmq.RCVHWM, 10)  # not working
    
    while 1:
        time.sleep(2)  # A speed reducer like.
        data = socket.recv()
        print(data)
    
    即使使用这些选项,队列大小仍然超过10(使用send/receive high watermark配置)。

    最佳答案

    我找到了一种在ZMQ订阅套接字中使用“仅最后一条消息”选项的方法(使用CONFLATE选项)。
    但是首先是,您应该在连接之前设置CONFLATE选项:

    import zmq
    import time
    
    port = "5556"
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    
    socket.setsockopt(zmq.SUBSCRIBE, '')
    socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
    socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.
    
    while 1:
        time.sleep(2)  # Dummy delay
        data = socket.recv()
        print(data)
    
    换句话说,我删除了订户代码中的所有缓冲队列。

    [注意]:
    另外,使用zmq.SNDBUFzmq.RCVBUF选项,我们可以设置ZMQ缓冲区大小的限制。 (More complete and an example)

    关于python - 如何在python中限制ZMQ(ZeroMQ-PyZMQ)队列缓冲区大小?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48278859/

    10-12 20:19