我有一些代码将工作添加到一个队列中,该工作由多个工作人员进行处理,然后将结果放入另一个队列中,最后一个工作人员对其进行处理。当我有多个生产者将材料添加到此结果队列时,如何可靠地向collector流程发出信号,表明没有其他要处理的内容了?

import multiprocessing
import time

J = multiprocessing.Queue()
R = multiprocessing.Queue()

def intermediate_worker(jobs, results):
    while True:
        task = jobs.get()
        if task is None:
            jobs.put(None)
            break
        print 'working', task
        results.put(task)

def collector(result_queue) :
    total = 0
    while True :
        result = result_queue.get()
        total += result
        print 'collection', total


[multiprocessing.Process(target=intermediate_worker, args=(J,R)).start()
 for i in xrange(2)]

multiprocessing.Process(target=collector, args=(R,)).start()

for chunk_dummy in xrange(10) :
    J.put(chunk)
J.put(None)

最佳答案

有几种方法。一种是寻找n个毒药,其中n是您拥有的生产者数量。您已经熟悉此方法,因为您正在使用它来关闭生产者。这行之有效,但有点笨拙-您需要在消费者中使用额外的逻辑来跟踪您看到过多少毒药。

我个人最喜欢的是使用Semaphore为我进行计数。如果我们知道有多少生产者还活着,那么足够的信息可以找出何时关闭消费者-我们在(no producers are alive)(the queue is empty)时将其关闭。

J = multiprocessing.Queue()
R = multiprocessing.Queue()
S = multiprocessing.Semaphore(NUMWORKERS)

def intermediate_worker(jobs, results, sem):
    with sem: #context manager handles incrementing/decrementing the semaphore
        while True:
            task = jobs.get()
            if task is None:
                jobs.put(None)
                break
            print 'working', task
            results.put(task)

def collector(result_queue, sem) :
    total = 0
    while sem.get_value() < NUMWORKERS or not result_queue.empty():
        result = result_queue.get()
        total += result
        print 'collection', total

[multiprocessing.Process(target=intermediate_worker, args=(J,R,S)).start()
 for i in xrange(NUMWORKERS)]

multiprocessing.Process(target=collector, args=(R,S)).start()


粗略的代码,完全未经测试,但应该可以理解。

关于python - 告诉多个生产者的消费者没有其他结果,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/22716657/

10-12 18:20