问题描述
正如您从标题中所知道的,我正在尝试将PriorityQueue与多处理一起使用.更准确地说,我想创建共享的PriorityQueue,编写了一些代码,但它没有按预期运行.
As you know from the title, I'm trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make shared PriorityQueue, wrote some code and it doesn't run as I expected.
查看代码:
import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue
def worker(queue):
lock = Lock()
with lock:
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
获得以下输出:
worker 100
main 0
正在发生的事情以及如何正确地做我想做的事情?谢谢.
What's happening and how to do what I want the right way?Thank you.
推荐答案
问题不是在这种情况下不能腌制-如果您使用的是类似Unix的平台,则可以将队列传递给子对象而无需腌制(在Windows上,我认为您会在这里得到一个腌制错误).根本问题是您没有使用进程安全队列.只能在进程之间使用的队列是 Queue
对象放在multiprocessing
模块中.不幸的是,没有可用的PriorityQueue
实现.但是,您可以通过向 multiprocessing.Manager
类注册PriorityQueue
来轻松创建一个,像这样:
The problem isn't that it's not picklable in this case - if you're using a Unix-like platform, the queue can be passed to the child without pickling. (On Windows, I think you would get a pickling error here, though). The root problem is that you're not using a process-safe queue. The only queues that can be used between processes are the Queue
objects that live inside the multiprocessing
module. Unfortunately, there is no PriorityQueue
implementation available. However, you can easily create one by registering a PriorityQueue
with a multiprocessing.Manager
class, like this:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue
class MyManager(SyncManager):
pass
MyManager.register("PriorityQueue", PriorityQueue) # Register a shared PriorityQueue
def Manager():
m = MyManager()
m.start()
return m
def worker(queue):
print(queue)
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
m = Manager()
pr_queue = m.PriorityQueue() # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
输出:
worker 100
main 100
请注意,这可能不如标准multiprocessing.Queue
子类那样好.基于Manager
的PriorityQueue
是通过创建实际上包含常规PriorityQueue
的Manager
服务器进程,然后为您的主进程和工作进程提供对象,这些对象使用IPC在服务器进程中读取/写入队列.常规multiprocessing.Queue
只是向Pipe
写入数据或从中读取数据.如果这是一个问题,您可以尝试通过对multiprocessing.Queue
进行子类化或委派来实现自己的multiprocessing.PriorityQueue
.不过,这样做可能不值得.
Note that this probably won't perform quite as well as it would if it was standard multiprocessing.Queue
subclass. The Manager
-based PriorityQueue
is implemented by creating a Manager
server process which actually contains a regular PriorityQueue
, and then providing your main and worker processes with Proxy
objects that use IPC to read/write to the queue in the server process. Regular multiprocessing.Queue
s just write/read data to/from a Pipe
. If that's a concern, you could try implementing your own multiprocessing.PriorityQueue
by subclassing or delegating from multiprocessing.Queue
. It may not be worth the effort, though.
这篇关于Python 2.7.6中具有多处理功能的奇怪Queue.PriorityQueue行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!