我对使用python进行队列处理感到好奇。让我先描述一下:
主线程创建一个负责收集和处理消息的全局对象(该对象内部有一个队列)
该全局对象具有在线程中启动的方法。方法有权访问队列(属于该对象)并从中读取消息
主线程启动了一系列进程(带有multiprocessing.Process),这些进程将消息发布到全局对象。
问题是:对于队列处理器,队列始终为空。让我用一些代码说明问题:
import threading as t
import time
import sys
from multiprocessing import Process
if sys.version_info.major is 2:
import Queue as queue
else:
import queue
class ExampleRecorder:
queue = queue.Queue()
def run(self):
self.thread = t.Thread(target=self.start_processor)
self.thread.daemon = True
self.thread.start()
def start_processor(self):
while 1:
print("PROCESSOR! QUEUE ID: {}. QUEUE SIZE: {}. IS EMPTY: {}".format(id(self.queue), self.queue.qsize(), self.queue.empty()))
time.sleep(1)
def push_message(self, span):
self.queue.put(span)
print("RECORDER! QUEUE ID: {}. QUEUE SIZE: {}".format(id(self.queue), self.queue.qsize()))
er = ExampleRecorder()
er.run()
def producer():
while 1:
print("Adding an item")
er.push_message("foo")
time.sleep(1)
proc = Process(target=producer)
proc.start()
该脚本的示例输出为:
$ python3 model.py
PROCESSOR! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 0. IS EMPTY: True
Adding an item
RECORDER! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 1
PROCESSOR! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 0. IS EMPTY: True
Adding an item
RECORDER! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 2
PROCESSOR! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 0. IS EMPTY: True
如您所见,队列接收对象并增长,但是对于处理器而言,它始终为空。那里可能出什么问题了?在进程中的线程中处理共享队列是否有些棘手?
附言经过Python 2.7.12和3.5.2检查
最佳答案
Queue.Queue
仅在同一进程中工作,它与线程一起使用,而不与单独的进程一起使用。
您需要一个multiprocessing.Queue
实例进行进程间通信,但是您还必须重组代码以将队列的实例显式传递给producer
。现在,在新过程中评估ExampleRecorder
定义时,每个生产者将创建一个不同的实例。
注意:使用您编写的代码,所有ExampleRecorder
实例(在同一进程中)共享同一队列!您确定这是您想要的吗?
通过在queue
块内定义class ...
,queue
是类的属性,而不是其实例。这与在queue
中定义__init__()
非常不同。一个简单的例子:
from queue import Queue
class SampleClass:
queue = Queue()
class AnotherSample:
def __init__(self):
self.queue = Queue()
inst1 = SampleClass()
inst2 = SampleClass()
inst3 = AnotherSample()
inst4 = AnotherSample()
如果我们测试
queue
属性,我们会看到区别:(is
运算符测试两个变量是否是同一对象的别名)inst1.queue is inst2.queue
Out[8]: True
inst3.queue is inst4.queue
Out[9]: False
关于python - Python:进程中线程中的队列处理很奇怪,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46281521/