我试图用一个服务器来编写服务器/客户端脚本,以完成任务,并由多个工作人员执行该任务。
问题是我的呼吸机执行的任务太多,以至于会在心跳中填满内存。
我试图在绑定(bind)之前设置HWM,但是没有成功。一旦工作人员连接,它就继续发送消息,而完全忽略了所设置的HWM。我还有一个接收器,用于记录已完成的任务。
server.py
import zmq
def ventilate():
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
sender.bind("tcp://*:5557")
# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print "Sending tasks to workers…"
# The first message is "0" and signals start of batch
sink.send('0')
print "Sent starting signal"
while True:
sender.send("Message")
if __name__=="__main__":
ventilate()
worker.py
import zmq
from multiprocessing import Process
def work():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process t asks forever
while True:
msg = receiver.recv_msg()
print "Doing sth with msg %s"%(msg)
sender.send("Message %s done"%(msg))
if __name__ == "__main__":
for worker in range(10):
Process(target=work).start()
下沉
import zmq
def sink():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
print "Received start signal"
while True:
msg = receiver.recv_msg()
print msg
if __name__=="__main__":
sink()
最佳答案
好的,我玩得很开心,我认为问题不在于PUSH HWM,而是您不能为PULL设置HWM。如果您查看this documentation,那么您会看到那里说不适用,表示对HWM采取的措施。
PULL套接字似乎每个都要接收数百条消息(我确实尝试设置HWM,以防万一它在PULL套接字上做任何事情。它没有。)我通过更改呼吸机来发送带有递增整数的消息,并更改了池中的每个工作程序在两次调用recv()
之间等待2秒钟来证明了这一点。 worker 们打印出他们正在处理具有截然不同的整数的消息。例如,一个工作人员将处理消息10,而下一个工作人员将处理消息400。随着时间的流逝,您看到正在处理消息10的工作人员现在正在处理消息11、12、13等。其他是处理401、402等。
这向我表明ZMQ_PULL套接字正在将消息缓冲在某处。因此,尽管ZMQ_PUSH套接字确实具有HWM,但是PULL套接字正在快速请求消息,尽管实际上未通过调用recv()
来访问它们。因此,如果连接了PULL socket ,则将导致PUSH HWM有效地被忽略。据我所知,您无法控制PULL套接字的缓冲区长度(我希望RCVHWM套接字选项可以控制此长度,但似乎没有)。
这种行为当然会引出一个问题,即ZMQ_PULL HWM选项的意义是什么,只有在您还可以控制接收套接字HWM时才有意义。
在这一点上,我将开始询问0MQ people您是否缺少明显的东西,或者这是否被视为错误。
抱歉,我帮不上忙!
关于python - ZeroMQ:PUSH上的HWM不起作用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/21133535/