我试图用process为生产者-消费者编写简单的代码。生产者是一个过程。对于消费者,我正在从Pool中获取流程。

from multiprocessing import Manager, Process, Pool
from time import sleep
def writer(queue):
   for i in range(10):
     queue.put(i)
     print 'put 1 size now ',queue.qsize()
     sleep(1)

def reader(queue):
   print 'in reader'
   for i in range(10):
     queue.get(1)
     print 'got 1 size now ', queue.qsize()

if __name__ == '__main__':
    q = Manager().Queue()
    p = Process(target=writer, args=(q,))
    p.start()
    pool = Pool()
    c = pool.apply_async(reader,q)

但我犯了错误,
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "pc.py", line 5, in writer
    queue.put(i)
  File "<string>", line 2, in put
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

谁能指点我,我错在哪里。

最佳答案

嗨,我找到了这个答案,

from multiprocessing import Manager, Process, Pool,Queue
from Queue import Empty

def writer(queue):
   for i in range(10):
     queue.put(i)
     print 'put %i size now %i'%(i, queue.qsize())

def reader(id, queue):
   for i in range(10):
     try:
       cnt = queue.get(1,1)
       print '%i got %i size now %i'%(id, cnt, queue.qsize())
     except Empty:
       pass

class Managerss:
   def __init__(self):
     self.queue= Queue()
     self.NUMBER_OF_PROCESSES = 3

   def start(self):
     self.p = Process(target=writer, args=(self.queue,))
     self.p.start()
     self.workers = [Process(target=reader, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
     for w in self.workers:
       w.start()

   def join(self):
     self.p.join()
     for w in self.workers:
       w.join()

if __name__ == '__main__':
    m= Managerss()
    m.start()
    m.join()

希望有帮助

关于python - 带有进程和池的Python生产者/消费者,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/15859374/

10-11 22:48
查看更多