我正在尝试模拟使用扭曲运行的应用程序网络。作为仿真的一部分,我希望同步某些事件,并能够为每个进程提供大量数据。我决定使用多处理事件和队列。但是,我的过程变得异常困惑。
我在下面编写了示例代码来说明问题。具体来说,(大约95%的时间是在我的沙桥计算机上),'run_in_thread'函数完成了,但是直到我按Ctrl-C之后才调用'print_done'回调。
另外,我可以更改示例代码中的几项内容,以使该工作更可靠,例如:减少产生的进程数,从reactor_ready调用self.ready.set或更改deferLater的延迟。
我猜想在扭曲的 react 堆和阻塞诸如Queue.get()或Event.wait()之类的多处理调用之间存在竞争状况吗?
我遇到的问题到底是什么?我的代码中是否存在我所缺少的错误?我可以解决此问题,还是与多处理事件/队列不兼容?
其次,会推荐像spawnProcess或Ampoule这样的替代方法吗? (如Mix Python Twisted with multiprocessing?中所建议)
编辑(按要求):
我尝试过glib2reactor selectreactor,polreactor和epollreactor的所有反应器都遇到了问题。 epollreactor似乎给出了最好的结果,并且在下面给出的示例中似乎可以很好地工作,但是在我的应用程序中仍然给我带来相同(或相似)的问题。我将继续调查。
我正在运行Gentoo Linux内核3.3和3.4,python 2.7,并且尝试了Twisted 10.2.0、11.0.0、11.1.0、12.0.0和12.1.0。
除了我的沙桥机之外,我在双核amd机上也看到了相同的问题。
#!/usr/bin/python
# -*- coding: utf-8 *-*
from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task
from multiprocessing import Process
from multiprocessing import Event
class TestA(Process):
def __init__(self):
super(TestA, self).__init__()
self.ready = Event()
self.ready.clear()
self.start()
def run(self):
reactor.callWhenRunning(self.reactor_ready)
reactor.run()
def reactor_ready(self, *args):
task.deferLater(reactor, 1, self.node_ready)
return args
def node_ready(self, *args):
print 'node_ready'
self.ready.set()
return args
def reactor_running():
print 'reactor_running'
df = threads.deferToThread(run_in_thread)
df.addCallback(print_done)
def run_in_thread():
print 'run_in_thread'
for n in processes:
n.ready.wait()
def print_done(dfResult=None):
print 'print_done'
reactor.stop()
if __name__ == '__main__':
processes = [TestA() for i in range(8)]
reactor.callWhenRunning(reactor_running)
reactor.run()
最佳答案
简短的答案是肯定的,Twisted和多重处理彼此不兼容,并且您无法像尝试那样可靠地使用它们。
在所有POSIX平台上,子流程管理与SIGCHLD
处理紧密相关。 POSIX信号处理程序是进程全局的,每种信号类型只能有一个。
Twisted和stdlib multiprocessing
不能同时安装SIGCHLD
处理程序。其中只有一个可以。这意味着它们中只有一个可以可靠地管理子进程。您的示例应用程序无法控制它们中的哪一个将赢得该功能,因此我希望由该事实引起的行为不确定性。
但是,您的示例更直接的问题是,您在父进程中加载了Twisted,然后使用multiprocessing
进行 fork ,而不是执行所有子进程。 Twisted不支持这样使用。如果先 fork 然后执行,就没有问题。但是,缺少新进程的执行程序(也许是使用Twisted的Python进程)会导致Twisted无法解决的所有额外共享状态。在您的特定情况下,导致此问题的共享状态是内部“waker fd”,用于实现deferToThread
。在父级和所有子级之间共享fd的情况下,当父级尝试唤醒主线程以传递deferToThread
调用的结果时,它很可能会唤醒子级进程之一。子进程无用可做,因此只是浪费时间。同时,父线程中的主线程永远不会唤醒,也永远不会注意到线程任务已完成。
您可以通过在创建子进程之前不加载任何Twisted来避免此问题。就Twisted而言,这会将您的用法转变为单进程用例(在每个进程中,将首先加载它,然后再完全停止该进程,因此毫无疑问如何进行fork和fork)。扭曲互动了)。这意味着,直到创建子进程之后,才导入Twisted。
当然,这只会对Twisted有所帮助。您使用的任何其他库都可能遇到类似的麻烦(您提到了glib2,这是另一个库的一个很好的示例,如果您尝试像这样使用它,它将完全阻塞)。
我强烈建议完全不使用multiprocessing
模块。相反,请使用涉及fork和exec的任何多进程方法,而不要单独使用fork。安瓿瓶属于这一类。
关于python - 与多处理事件和队列不兼容吗?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/11272874/