问题描述
我正在尝试使用多处理的池来运行一组进程,每个进程都将运行一个 gevent 的 greenlets 池.这样做的原因是有很多网络活动,但也有很多 CPU 活动,所以为了最大化我的带宽和我所有的 CPU 内核,我需要多个进程和 gevent 的异步猴子补丁.我正在使用多处理的管理器创建一个队列,进程将访问该队列以获取要处理的数据.
I am attempting to use multiprocessing's pool to run a group of processes, each of which will run a gevent pool of greenlets. The reason for this is that there is a lot of network activity, but also a lot of CPU activity, so to maximise my bandwidth and all of my CPU cores, I need multiple processes AND gevent's async monkey patching. I am using multiprocessing's manager to create a queue which the processes will access to get data to process.
这是代码的简化片段:
import multiprocessing
from gevent import monkey
monkey.patch_all(thread=False)
manager = multiprocessing.Manager()
q = manager.Queue()
这是它产生的异常:
Traceback (most recent call last):
File "multimonkeytest.py", line 7, in <module>
q = manager.Queue()
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create
conn = self._Client(self._address, authkey=self._authkey)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client
answer_challenge(c, authkey)
File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 409, in answer_challenge
message = connection.recv_bytes(256) # reject large message
IOError: [Errno 35] Resource temporarily unavailable
我相信这一定是由于普通 socket 模块和 gevent 的 socket 模块的行为之间存在一些差异.
I believe this must be due to some difference between the behaviour of the normal socket module and gevent's socket module.
如果我在子进程中进行monkeypatch,则队列创建成功,但是当子进程尝试从队列中获取()时,会发生非常相似的异常.由于在子进程中执行大量网络请求,因此确实需要对套接字进行修补.
If I monkeypatch within the subprocess, The queue is created successfully, but when the subprocess tries to get() from the queue, a very similar exception occurs. The socket does need to be monkeypatched due to doing large numbers of network requests in the subprocesses.
我的 gevent 版本,我认为是最新的:
My version of gevent, which I believe is the latest:
>>> gevent.version_info
(1, 0, 0, 'alpha', 3)
有什么想法吗?
推荐答案
使用 monkey.patch_all(thread=False, socket=False)
我在类似的情况下遇到了同样的问题,并在 patch_socket()
函数下的 gevent/monkey.py
中将其追踪到第 115 行:_socket.socket = socket.socket
.将此行注释掉可以防止损坏.
I have run into the same issue in a similar situation and tracked this down to line 115 in gevent/monkey.py
under the patch_socket()
function: _socket.socket = socket.socket
. Commenting this line out prevents the breakage.
这是 gevent 用它自己的替换 stdlib socket
库的地方.multiprocessing.connection
非常广泛地使用 socket
库,显然不能容忍这种变化.
This is where gevent replaces the stdlib socket
library with its own. multiprocessing.connection
uses the socket
library quite extensively, and is apparently not tolerant to this change.
具体来说,在您导入的模块执行 gevent.monkey.patch_all()
调用而不设置 socket=False
的任何情况下,您都会看到这一点.在我的例子中,是 grequests
做到了这一点,我必须重写套接字模块的补丁来修复这个错误.
Specifically, you will see this in any scenario where a module you import performs a gevent.monkey.patch_all()
call without setting socket=False
. In my case it was grequests
that did this, and I had to override the patching of the socket module to fix this error.
这篇关于Gevent 猴子补丁破坏多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!