我试图用一个服务器来编写服务器/客户端脚本,以完成任务,并由多个工作人员执行该任务。
问题是我的呼吸机执行的任务太多,以至于会在心跳中填满内存。
我试图在绑定(bind)之前设置HWM,但是没有成功。一旦工作人员连接,它就继续发送消息,而完全忽略了所设置的HWM。我还有一个接收器,用于记录已完成的任务。

server.py

import zmq

def ventilate():
    context = zmq.Context()

    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
    sender.bind("tcp://*:5557")


    # Socket with direct access to the sink: used to syncronize start of batch
    sink = context.socket(zmq.PUSH)
    sink.connect("tcp://localhost:5558")

    print "Sending tasks to workers…"

    # The first message is "0" and signals start of batch
    sink.send('0')
    print "Sent starting signal"

    while True:
        sender.send("Message")



if __name__=="__main__":
    ventilate()

worker.py
import zmq
from multiprocessing import Process

def work():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")

    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")

    # Process t asks forever
    while True:
        msg = receiver.recv_msg()
        print "Doing sth with msg %s"%(msg)
        sender.send("Message %s done"%(msg))

if __name__ == "__main__":
    for worker in range(10):
        Process(target=work).start()

下沉
import zmq

def sink():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://*:5558")

    # Wait for start of batch
    s = receiver.recv()
    print "Received start signal"
    while True:
        msg = receiver.recv_msg()
        print msg


if __name__=="__main__":
    sink()

最佳答案

好的,我玩得很开心,我认为问题不在于PUSH HWM,而是您不能为PULL设置HWM。如果您查看this documentation,那么您会看到那里说不适用,表示对HWM采取的措施。

PULL套接字似乎每个都要接收数百条消息(我确实尝试设置HWM,以防万一它在PULL套接字上做任何事情。它没有。)我通过更改呼吸机来发送带有递增整数的消息,并更改了池中的每个工作程序在两次调用recv()之间等待2秒钟来证明了这一点。 worker 们打印出他们正在处理具有截然不同的整数的消息。例如,一个工作人员将处理消息10,而下一个工作人员将处理消息400。随着时间的流逝,您看到正在处理消息10的工作人员现在正在处理消息11、12、13等。其他是处理401、402等。

这向我表明ZMQ_PULL套接字正在将消息缓冲在某处。因此,尽管ZMQ_PUSH套接字确实具有HWM,但是PULL套接字正在快速请求消息,尽管实际上未通过调用recv()来访问它们。因此,如果连接了PULL socket ,则将导致PUSH HWM有效地被忽略。据我所知,您无法控制PULL套接字的缓冲区长度(我希望RCVHWM套接字选项可以控制此长度,但似乎没有)。

这种行为当然会引出一个问题,即ZMQ_PULL HWM选项的意义是什么,只有在您还可以控制接收套接字HWM时才有意义。

在这一点上,我将开始询问0MQ people您是否缺少明显的东西,或者这是否被视为错误。

抱歉,我帮不上忙!

关于python - ZeroMQ:PUSH上的HWM不起作用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/21133535/

10-12 22:20