我发现了一个类似的问题 ZeroMQ: HWM on PUSH does not work ,但它无法解决我的问题。
我想控制推送套接字排队的消息数量,但它不起作用并且仍然排队 1000 条消息。
所以我想知道如何设置push socket的hwm。提前致谢。
我的环境是:libzmq 4.0.4、pyzmq 14.1.0、python 3.3
这是我的代码:
服务器.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import random
import zmq
class TestPush(object):
def __init__(self):
self.ctx = zmq.Context()
random.seed()
def run(self):
task_snd = self.ctx.socket(zmq.PUSH)
task_snd.setsockopt(zmq.SNDHWM, 10)
task_snd.bind('tcp://*:53000')
while True:
workload = str(random.randint(1, 100))
task_snd.send(workload.encode('utf-8'))
print('Send {0}'.format(workload))
if __name__ == '__main__':
test_push = TestPush()
test_push.run()
客户端.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import random
import zmq
class TestPull(object):
def __init__(self):
self.ctx = zmq.Context()
def run(self):
task_rcv = self.ctx.socket(zmq.PULL)
task_rcv.setsockopt(zmq.RCVHWM, 1)
task_rcv.connect('tcp://localhost:53000')
while True:
msg = task_rcv.recv()
print('Receive msg: {0}'.format(msg))
time.sleep(random.randint(2, 3))
if __name__ == '__main__':
test_pull = TestPull()
test_pull.run()
最佳答案
当我尝试在推拉式 socket 上设置 HWM(高水位线)时,我遇到了与 ZeroMQ 类似的问题。甚至,它也不适用于 pub 和 sub 套接字。我想解释一下发生了什么以及它是如何解决的。
我制作了 2 个脚本,第一个作为带有推式套接字的发送者,另一个作为带有拉式套接字的接收者。将两个套接字上的 HWM 设置为 10。在接收器脚本中,我在收到每条消息后延迟 5 秒。然后我用 100 条消息的循环运行发送者脚本(在保持接收器运行以接收之后)。
我所期望的:
接收方队列和发送方队列将到达 hwm。之后,发件人将停止发送更多消息。
但是发生了什么:
发件人发送了所有 100 条消息并退出。但是接收者一直在一个接一个地处理消息,直到它接收到所有消息。
经过研究我找到了原因:
有一种叫做内核套接字缓冲区的东西,它位于发送方套接字和接收方套接字之间。每当进程打开套接字时,内核都会为 tcp 套接字分配内存空间,默认为 128KB。内核套接字缓冲区适用于发送方和接收方套接字(因此总缓冲区将为 128KB + 128KB)。我的消息大小以字节为单位(带有一些字符的 int)。因此,总的消息缓冲如下:
总缓冲区消息 =
发件人套接字 hwm
+ 发送者套接字的内核套接字缓冲区 (128KB)
+ 接收器 socket hwm
+ 发送者套接字的内核套接字缓冲区 (128KB)
解决方案:
现在,我将消息长度更改为 1KB 多一点。然后再次执行测试,发现发送了大约 260 条消息(如预期),此后发送方停止,直到接收方收到一些消息并再次启动。
附加信息
为了使推送套接字在接收方无法接收的情况下继续发送消息,我们可以在发送例程中使用 NOBLOCK 选项,但是接收方丢失消息的数量会增加很多。因此,更好的选择是使用 Poll 或 timeout,然后使用 NOBLOCK 选项调用发送例程。
请注意,您可以使用 zeromq 的 SNDBUFF/RCVBUFF,但操作系统可能不遵守它(就我而言,它不起作用)。
关于python - 如何在 zmq 的推/拉模式中设置 hwm?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/22613737/