根据ZeroMQ文档,一旦排队的消息数量达到最高水位,发布套接字应该丢弃消息。
在以下示例中,这似乎不起作用(是的,在绑定/连接之前,我确实设置了hwm
):
import time
import pickle
from threading import Thread
import zmq
ctx = zmq.Context()
def pub_thread():
pub = ctx.socket(zmq.PUB)
pub.set_hwm(2)
pub.bind('tcp://*:5555')
i = 0
while True:
# Send message every 100ms
time.sleep(0.1)
pub.send_string("test", zmq.SNDMORE)
pub.send_pyobj(i)
i += 1
def sub_thread():
sub = ctx.socket(zmq.SUB)
sub.subscribe("test")
sub.connect('tcp://localhost:5555')
while True:
# Receive messages only every second
time.sleep(1)
msg = sub.recv_multipart()
print("Sub: %d" % pickle.loads(msg[1]))
t_pub = Thread(target=pub_thread)
t_sub = Thread(target=sub_thread)
t_pub.start()
t_sub.start()
while True:
pass
我在pub上发送消息的速度比在子套接字上读取消息的速度快10倍,
hwm
设置为2。我希望只能收到大约每10条消息。相反,我看到以下输出:Sub: 0
Sub: 1
Sub: 2
Sub: 3
Sub: 4
Sub: 5
Sub: 6
Sub: 7
Sub: 8
Sub: 9
Sub: 10
Sub: 11
Sub: 12
Sub: 13
Sub: 14
...
所以我看到所有消息都到达了,因此它们一直排在队列中,直到我读完为止。在连接之前在子插座上添加hwm = 2时也是如此。
我在做什么错还是误解了
hwm
?我使用pyzmq版本17.1.2
最佳答案
通过借用issue which I opened in Github的答案,我将答案更新如下:
消息保存在操作系统的网络缓冲区中。我找到
因此,HWM没那么有用。这是修改后的代码
订户错过消息的地方:
import time
import pickle
import zmq
from threading import Thread
import os
ctx = zmq.Context()
def pub_thread():
pub = ctx.socket(zmq.PUB)
pub.setsockopt(zmq.SNDHWM, 2)
pub.setsockopt(zmq.SNDBUF, 2*1024) # See: http://api.zeromq.org/4-2:zmq-setsockopt
pub.bind('tcp://*:5555')
i = 0
while True:
time.sleep(0.001)
pub.send_string(str(i), zmq.SNDMORE)
pub.send(os.urandom(1024))
i += 1
def sub_thread():
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b'')
sub.setsockopt(zmq.RCVHWM, 2)
sub.setsockopt(zmq.RCVBUF, 2*1024)
sub.connect('tcp://localhost:5555')
while True:
time.sleep(0.1)
msg, _ = sub.recv_multipart()
print("Received:", msg.decode())
t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()
输出看起来像这样:
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 47
Received: 48
Received: 64
Received: 65
Received: 84
Received: 85
Received: 159
Received: 160
Received: 270
由于所有队列/缓冲区已满并且发布者,因此错过了消息
开始删除消息(请参阅ZMQ_PUB的文档:
http://api.zeromq.org/4-2:zmq-socket)。
[注意]:
您应该在侦听器/订阅者和广告商/发布者中使用高水印选项。
这些帖子也相关(Post1-Post2)
sock.setsockopt(zmq.CONFLATE, 1)
是仅获取在订户方定义的最后一条消息的另一种选择。关于python - Pyzmq高水位标记在酒吧 socket 上不起作用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53356451/