问题描述
发布的代码启动两个异步进程.第一个publisher
Process 将数据发布到Queue
,第二个subscriber
Process 读取Queue
中的数据并将其记录到控制台.
The posted code starts two async Processes. The first publisher
Process publishes data to the Queue
, the second subscriber
Process reads the data from the Queue
and logs it to console.
为了保证Queue不被同时访问,在从Queue
中获取数据之前,subscribe
函数首先执行lock.acquire()
,然后用data = q.get()
获取数据,最后用lock.release()
语句释放锁.
To make sure the Queue is not accessed at the same time, before getting the data from the Queue
, the subscribe
function first executes lock.acquire()
, then gets the data with data = q.get()
and finally releases the lock with the lock.release()
statement.
publish
函数中使用了相同的锁定释放序列.但是自从在 publish
中获取 lock
后,我不得不注释两行函数使脚本停止.为什么?
The same locking-releasing sequence is used in publish
function. But I had to comment two lines since acquiring the lock
in publish
function brings the script to halt. Why?
import multiprocessing, time, uuid, logging
log = multiprocessing.log_to_stderr()
log.setLevel(logging.INFO)
queue = multiprocessing.Queue()
lock = multiprocessing.Lock()
def publish(q):
for i in range(20):
data = str(uuid.uuid4())
# lock.acquire()
q.put(data)
# lock.release()
log.info('published: %s to queue: %s' % (data, q))
time.sleep(0.2)
def subscribe(q):
while True:
lock.acquire()
data = q.get()
lock.release()
log.info('.......got: %s to queue: %s' % (data, q))
time.sleep(0.1)
publisher = multiprocessing.Process(target=publish, args=(queue,))
publisher.start()
subscriber = multiprocessing.Process(target=subscribe, args=(queue,))
subscriber.start()
推荐答案
并且它们支持内部block
ing机制(参见get
/put
方法的签名).
在您的情况下,您不需要 lock
.
And they support internal block
ing mechanism (see the signatures of get
/put
methods).
You don't need a lock
in your case.
import multiprocessing, time, uuid, logging
log = multiprocessing.log_to_stderr()
log.setLevel(logging.INFO)
queue = multiprocessing.Queue()
def publish(q):
for i in range(20):
data = str(uuid.uuid4())
q.put(data)
log.info('published: %s to queue: %s' % (data, q))
time.sleep(0.2)
q.put(None)
def subscribe(q):
while True:
data = q.get()
if data is None:
log.info('....... end of queue consumption')
break
log.info('.......got: %s to queue: %s' % (data, q))
time.sleep(0.1)
publisher = multiprocessing.Process(target=publish, args=(queue,))
publisher.start()
subscriber = multiprocessing.Process(target=subscribe, args=(queue,))
subscriber.start()
publisher.join()
subscriber.join()
示例输出:
[INFO/Process-1] child process calling self.run()
[INFO/Process-1] published: eff77f27-e13e-4d55-9f34-4ea5fc464fc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] child process calling self.run()
[INFO/Process-2] .......got: eff77f27-e13e-4d55-9f34-4ea5fc464fc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 264fcf94-9195-4145-b0a1-5ddd787bee1f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 264fcf94-9195-4145-b0a1-5ddd787bee1f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 2e040d60-5fd4-45c9-98e6-f0032e13dae8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 2e040d60-5fd4-45c9-98e6-f0032e13dae8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: afe406ea-20cc-41b3-9cf5-c1dbea11580d to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: afe406ea-20cc-41b3-9cf5-c1dbea11580d to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: e14a6c04-e2fe-4394-a189-5c57c5a98bc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: e14a6c04-e2fe-4394-a189-5c57c5a98bc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: fb90ba87-8090-4ec6-9ac1-85bcaa2bb3f6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: fb90ba87-8090-4ec6-9ac1-85bcaa2bb3f6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 85ab36ee-36f3-4c67-8260-7c41ea82a5d5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 85ab36ee-36f3-4c67-8260-7c41ea82a5d5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: d4dce917-9b5c-470a-9063-bfb0221da55f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: d4dce917-9b5c-470a-9063-bfb0221da55f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 1e1e2f02-932d-418d-b603-8c90f4699423 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 1e1e2f02-932d-418d-b603-8c90f4699423 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 0b80f1df-c803-4c00-be4d-fad39213829b to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 0b80f1df-c803-4c00-be4d-fad39213829b to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: f6afef2a-42f8-4330-b995-26ee41f833a5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: f6afef2a-42f8-4330-b995-26ee41f833a5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: abd85275-dc9f-478c-8528-23217db79631 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: abd85275-dc9f-478c-8528-23217db79631 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: c4fad226-8c83-4e52-beae-1cb9a825d370 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: c4fad226-8c83-4e52-beae-1cb9a825d370 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: ca16fd7d-ff51-4019-970c-f55c2b3c0db2 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: ca16fd7d-ff51-4019-970c-f55c2b3c0db2 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: eca614df-89da-47d0-a8a5-90b56fadb922 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: eca614df-89da-47d0-a8a5-90b56fadb922 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 046903d7-0fd8-4af0-ac49-a22efdc9c029 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 046903d7-0fd8-4af0-ac49-a22efdc9c029 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 7904d15a-7b04-4968-a52c-cfd8d822b921 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 7904d15a-7b04-4968-a52c-cfd8d822b921 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 8543b520-9a7e-4e22-afb3-a4880d910482 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 8543b520-9a7e-4e22-afb3-a4880d910482 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: b4e98f5e-ce63-4f11-a6f7-b7d36020deb0 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: b4e98f5e-ce63-4f11-a6f7-b7d36020deb0 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 4a5eb231-4ccf-41e1-a0d6-ca41a50a6fd6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 4a5eb231-4ccf-41e1-a0d6-ca41a50a6fd6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] process shutting down
[INFO/Process-2] ....... end of queue consumption
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] process shutting down
[INFO/Process-2] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
Process finished with exit code 0
这篇关于如何使用带锁的多处理队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!