问题描述
好的,因为目前没有答案,我觉得这样做不太好。虽然我仍然对幕后实际发生的事情感兴趣,导致这个问题,但我最迫切的问题是更新2中指定的问题。那些是
JoinableQueue
和 Manager()之间有什么区别Queue()
(何时应该使用一个在另一个?)。重要的是,在这个例子中替换一个是安全的吗?
在下面的代码中,我有一个简单的流程池。每个进程都通过进程队列( pq
)来提取要处理的数据和一个返回值队列( rq
)将处理的返回值传递回主线程。如果我不附加到返回值队列,它可以工作,但是一旦我做,由于某些原因,进程被阻止停止。在这两种情况下,进程运行
方法返回,所以在返回队列阻止时不是 put
,而是在第二个情况下,进程本身不会终止,因此程序在进程的 join
时会死锁。为什么会这样?
更新:
-
似乎有一些与队列中的项目数量有关。
在我的机器上,至少可以有6570项目在队列中,它实际上是有效的,但是超出这个并且它死锁。 -
它似乎与
Manager()。Queue()
。
是否限制JoinableQueue
或只是我误解两个对象之间的差异,我发现如果我用一个Manager()。Queue()
替换返回队列,它的工作原理。他们之间有什么区别,何时应该使用一个? -
如果我从
rq
Oop。这里有一个答案,正如我在评论的那样,它消失了。无论如何,它说的一件事情是质疑是否,如果我添加消费者这个错误仍然发生。我已经尝试了这个,答案是,不,它不是。
另一件它提到的是从作为问题的一个可能的关键。参考
JoinableQueue
,它说:
import multiprocessing
class _ProcSTOP:
pass
class Proc(multiprocessing.Process):
def __init __(self,pq,rq):
self._pq = pq
self._rq = rq
super() $ _ $($)
print('++',self.name)
def run(self):
dat = self._pq.get()
而不是dat是_ProcSTOP:
#self._rq.put(dat)#取消注释我的死锁
self._pq.task_done()
dat = self._pq.get ()
self._pq.task_done()
print('==',self.name)
def __del __(self):
print(' - ',self.name)
如果__nam e__ =='__main__':
pq = multiprocessing.JoinableQueue()
rq = multiprocessing.JoinableQueue()
pool = []
我在范围(4):
p = Proc(pq,rq)
p.start()
pool.append(p)
for i in range 10000):
pq.put(i)
pq.join()
对于范围(4)中的i:
pq.put _ProcSTOP)
pq.join()
while len(pool)> 0:
print('??',pool)
pool.pop()。join()#hangs here(如果使用rq)
print('** complete ')
示例输出,不使用返回队列:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [Proc(Proc-1,started)>< Proc(Proc-2,started)>< Proc(Proc-3,started)>< Proc(Proc-4, >]
== Proc-2
?? [Proc(Proc-1,stopped)>< Proc(Proc-2,started)>< Proc(Proc-3,stopped)>]
- Proc-3
? [< Proc(Proc-1,stopped)>< Proc(Proc-2,started)>]
- Proc-2
?? [< Proc(Proc-1,stopped)>]
- Proc-1
**完成
- Proc-4
使用返回队列输出样本:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc- 2
== Proc-4
== Proc-1
?? [Proc(Proc-1,started)>< Proc(Proc-2,started)>< Proc(Proc-3,started)>< Proc(Proc-4, >]
== Proc-3
#这里挂起
从:
所以JoinableQueue()使用一个管道,并等待它在关闭之前刷新所有数据。 >
另一方面,Manager.Queue()对象使用完全不同的方法。
管理员正在运行一个单独的进程,可立即接收所有数据(并将其存储在其内存中)。
Ok, since there are currently no answer's I don't feel too bad doing this.While I'm still interested in what is actually happening behind the scenes to cause this problem, my most urgent questions are those specified in update 2. Those being,
What are the differences between a JoinableQueue
and a Manager().Queue()
(and when should you use one over the other?). And importantly, is it safe to replace one for the other, in this example?
In the following code, I have a simple process pool. Each process is passed the process queue (pq
) to pull data to be processed from, and a return-value queue (rq
) to pass the returned values of the processing back to the main thread. If I don't append to the return-value queue it works, but as soon as I do, for some reason the processes are blocked from stopping. In both cases the processes run
methods return, so it's not put
on the return-queue blocking, but in the second case the processes themselves do not terminate, so the program deadlocks when I join
on the processes. Why would this be?
Updates:
It seems to have something to with the number of items in the queue.
On my machine at least, I can have up to 6570 items in the queue and it actually works, but any more than this and it deadlocks.It seems to work with
Manager().Queue()
.
Whether it's a limitation ofJoinableQueue
or just me misunderstanding the differences between the two objects, I've found that if I replace the return queue with aManager().Queue()
, it works as expected. What are the differences between them, and when should you use one over the other?The error does not occur if I'm consuming from
rq
Oop. There was an answer here for a moment, and as I was commenting on it, it disappeared. Anyway one of the things it said was questioning whether, if I add a consumer this error still occurs. I have tried this, and the answer is, no it doesn't.The other thing it mentioned was this quote from the multiprocessing docs as a possible key to the problem. Referring to
JoinableQueue
's, it says:
import multiprocessing
class _ProcSTOP:
pass
class Proc(multiprocessing.Process):
def __init__(self, pq, rq):
self._pq = pq
self._rq = rq
super().__init__()
print('++', self.name)
def run(self):
dat = self._pq.get()
while not dat is _ProcSTOP:
# self._rq.put(dat) # uncomment me for deadlock
self._pq.task_done()
dat = self._pq.get()
self._pq.task_done()
print('==', self.name)
def __del__(self):
print('--', self.name)
if __name__ == '__main__':
pq = multiprocessing.JoinableQueue()
rq = multiprocessing.JoinableQueue()
pool = []
for i in range(4):
p = Proc(pq, rq)
p.start()
pool.append(p)
for i in range(10000):
pq.put(i)
pq.join()
for i in range(4):
pq.put(_ProcSTOP)
pq.join()
while len(pool) > 0:
print('??', pool)
pool.pop().join() # hangs here (if using rq)
print('** complete')
Sample output, not using return-queue:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-2
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>]
-- Proc-3
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>]
-- Proc-2
?? [<Proc(Proc-1, stopped)>]
-- Proc-1
** complete
-- Proc-4
Sample output, using return queue:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-3
# here it hangs
From the documentation:
So the JoinableQueue() uses a pipe and will wait until it can flush all data before closing.
On the other hand a Manager.Queue() object uses a completely different approach.Managers are running a separate process that receive all data immediately (and store it in its memory).
这篇关于子进程完成但仍然不会终止,导致死锁的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!