我从这个问题中得到了线索怎么可能我将 send_json 与 pyzmq PUB SUB 一起使用 所以看起来 PUB/SUB 架构有同样的问题,毫无疑问其他人也是如此.这在文档中有描述,但不是很清楚http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern更新事实上,我发现在我的情况下,我可以通过直接使用消息信封的客户端 ID"部分来进一步简化代码.所以工人只是这样做:context = zmq.Context()socket = context.socket(zmq.DEALER)socket.identity = str(os.getpid()) # 或者我可以省略它并使用ØMQ客户端IDsocket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))socket.send_json(真)另外值得注意的是,当你想从 ROUTER 向另一个方向发送消息时,你必须将它作为多部分发送,指定它的目的地是哪个客户端,例如:coordinator.pycontext = zmq.Context()socket = context.socket(zmq.ROUTER)端口 = socket.bind_to_random_port(ZMQ_ADDRESS)轮询器 = zmq.Poller()poller.register(套接字,zmq.POLLIN)pids = set()为真:事件 = poller.poll(1)如果不是事件:继续process_id, val = socket.recv_json()pids.add(process_id)# 这里需要一些代码来决定什么时候停止监听# 并打破循环对于pids中的pid:socket.send_multipart([pid, 'a string message'])# ^ 如果需要,做你自己的 json 编码我想可能有一些 ØMQ 方式来做广播消息,而不是像我上面那样循环发送到每个客户端.我希望文档对每种可用的套接字类型以及如何使用它们有一个清晰的描述.Here is my code with the extraneous stuff stripped out:coordinator.pycontext = zmq.Context()socket = context.socket(zmq.ROUTER)port = socket.bind_to_random_port(ZMQ_ADDRESS)poller = zmq.Poller()poller.register(socket, zmq.POLLIN)while True: event = poller.poll(1) if not event: continue process_id, val = socket.recv_json()worker.pycontext = zmq.Context()socket = context.socket(zmq.DEALER)socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))socket.send_json( (os.getpid(), True))what happens when I run it: process_id, val = socket.recv_json() File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json return jsonapi.loads(msg) File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads return jsonmod.loads(s, **kwargs) File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads return _default_decoder.decode(s) File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode obj, end = self.raw_decode(s) File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode raise JSONDecodeError("No JSON object could be decoded", s, idx)JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)and if I dig in with ipdb:> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json() 379 msg = self.recv(flags)--> 380 return jsonapi.loads(msg) 381ipdb> p msg'\x00\x9f\xd9\x06\xa2'hmm, that doesn't look like JSON... is this a bug in pyzmq? am I using it wrong? 解决方案 Hmm, ok, found the answer.There is an annoying asymmetry in the ØMQ interface, so you have to be aware of the type of socket you are using.In this case my use of ROUTER/DEALER architecture means that the JSON message sent from the DEALER socket, when I do send_json, gets wrapped in multipart message envelope. The first part is a client id (I guess this is the '\x00\x9f\xd9\x06\xa2' that I got above) and the second part is the JSON string we are interested in.So in the last line of my coordinator.py I need to do this instead:id_, msg = socket.recv_multipart()process_id, val = json.loads(msg)IMHO this is bad design on the part of ØMQ/pyzmq, the library should abstract this away and have just send and recv methods, that just work.I got the clue from this question How can I use send_json with pyzmq PUB SUB so it looks like PUB/SUB architecture has the same issue, and no doubt others too.This is described in the docs but it's not very clearhttp://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-PatternUpdateIn fact, I found in my case I could simplify the code further, by making use of the 'client id' part of the message envelope directly. So the worker just does:context = zmq.Context()socket = context.socket(zmq.DEALER)socket.identity = str(os.getpid()) # or I could omit this and use ØMQ client idsocket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))socket.send_json(True)It's also worth noting that when you want to send a message the other direction, from the ROUTER, you have to send it as multipart, specifying which client it is destined for, eg:coordinator.pycontext = zmq.Context()socket = context.socket(zmq.ROUTER)port = socket.bind_to_random_port(ZMQ_ADDRESS)poller = zmq.Poller()poller.register(socket, zmq.POLLIN)pids = set()while True: event = poller.poll(1) if not event: continue process_id, val = socket.recv_json() pids.add(process_id) # need some code in here to decide when to stop listening # and break the loopfor pid in pids: socket.send_multipart([pid, 'a string message']) # ^ do your own json encoding if requiredI guess there is probably some ØMQ way of doing a broadcast message rather than sending to each client in a loop as I do above. I wish the docs just had a clear description of each available socket type and how to use them. 这篇关于pyzmq recv_json 无法解码 send_json 发送的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云! 09-03 08:52