我正在将pyzmq
库与 pub/sub 模式一起使用。我通过.connect()
方法给一些快速的ZMQ发布者,通过.bind()
方法给ozt_strong一个较慢的ZMQ用户。
然后,几分钟后,我的订阅者从发布者那里获取了旧的发布数据(由于ZMQ缓冲区而导致了)。
我的问题:
有什么方法可以管理 ZMQ队列缓冲区大小? (设置有限的缓冲区)
[注意]:high watermark
选项,但是没有用:
发布者:
订阅者:import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10) # not working
while True:
data = time.time()
print("%d" % data)
socket.send("%d" % data)
time.sleep(1)
即使使用这些选项,队列大小仍然超过10(使用import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10) # not working
while 1:
time.sleep(2) # A speed reducer like.
data = socket.recv()
print(data)
send/receive high watermark
配置)。
最佳答案
我找到了一种在ZMQ
订阅套接字中使用“仅最后一条消息”选项的方法(使用CONFLATE
选项)。
但是首先是,您应该在连接之前设置CONFLATE
选项:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1) # last msg only.
socket.connect("tcp://localhost:%s" % port) # must be placed after above options.
while 1:
time.sleep(2) # Dummy delay
data = socket.recv()
print(data)
换句话说,我删除了订户代码中的所有缓冲队列。 [注意]:
另外,使用
zmq.SNDBUF
和zmq.RCVBUF
选项,我们可以设置ZMQ缓冲区大小的限制。 (More complete and an example)关于python - 如何在python中限制ZMQ(ZeroMQ-PyZMQ)队列缓冲区大小?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48278859/