我有一些代码将工作添加到一个队列中,该工作由多个工作人员进行处理,然后将结果放入另一个队列中,最后一个工作人员对其进行处理。当我有多个生产者将材料添加到此结果队列时,如何可靠地向collector
流程发出信号,表明没有其他要处理的内容了?
import multiprocessing
import time
J = multiprocessing.Queue()
R = multiprocessing.Queue()
def intermediate_worker(jobs, results):
while True:
task = jobs.get()
if task is None:
jobs.put(None)
break
print 'working', task
results.put(task)
def collector(result_queue) :
total = 0
while True :
result = result_queue.get()
total += result
print 'collection', total
[multiprocessing.Process(target=intermediate_worker, args=(J,R)).start()
for i in xrange(2)]
multiprocessing.Process(target=collector, args=(R,)).start()
for chunk_dummy in xrange(10) :
J.put(chunk)
J.put(None)
最佳答案
有几种方法。一种是寻找n个毒药,其中n是您拥有的生产者数量。您已经熟悉此方法,因为您正在使用它来关闭生产者。这行之有效,但有点笨拙-您需要在消费者中使用额外的逻辑来跟踪您看到过多少毒药。
我个人最喜欢的是使用Semaphore
为我进行计数。如果我们知道有多少生产者还活着,那么足够的信息可以找出何时关闭消费者-我们在(no producers are alive)
和(the queue is empty)
时将其关闭。
J = multiprocessing.Queue()
R = multiprocessing.Queue()
S = multiprocessing.Semaphore(NUMWORKERS)
def intermediate_worker(jobs, results, sem):
with sem: #context manager handles incrementing/decrementing the semaphore
while True:
task = jobs.get()
if task is None:
jobs.put(None)
break
print 'working', task
results.put(task)
def collector(result_queue, sem) :
total = 0
while sem.get_value() < NUMWORKERS or not result_queue.empty():
result = result_queue.get()
total += result
print 'collection', total
[multiprocessing.Process(target=intermediate_worker, args=(J,R,S)).start()
for i in xrange(NUMWORKERS)]
multiprocessing.Process(target=collector, args=(R,S)).start()
粗略的代码,完全未经测试,但应该可以理解。
关于python - 告诉多个生产者的消费者没有其他结果,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/22716657/