本文介绍了python多处理进程池无法找到异步函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
相当简单的多处理示例.目标:
Pretty simple multiprocessing example. Goals:
- 使用
mp.Pool
创建流程工作者池 - 进行某种转换(这里是
line
上的简单字符串操作) - 将转换后的行推到
mp.Queue
- 随后从主程序中的
mp.Queue
中进一步获取过程数据
- Create a pool of process workers using
mp.Pool
- Do some sort of transformation (here a simple string operation on
line
) - Push the transformed line to
mp.Queue
- Further process data from that
mp.Queue
in the main program afterwards
因此,请执行以下操作:
So lets do this:
import multiprocessing as mp
使用mp.queue初始化异步进程
Init async processes with a mp.queue
def process_pool_init_per_process(q):
global mp_queue
mp_queue = q
真正初始化mp_pool
Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))
每个要异步处理的line
都会被调用
This is getting called for every line
to be proccesed async
def process_async_main(line):
print(line)
q.put(line + '_asynced')
现在让我们使用apply_async
line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()
从队列中读取
while not q.empty():
print(q.get()) # This should be the inital line
失败:
python3 mp_process_example.py
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_async_main' on <module '__main__' from 'mp_process_example.py'>
问题是:为什么多处理程序找不到主类?
要复制的完整代码:
import multiprocessing as mp
##### Init async processes
def process_pool_init_per_process(q):
global mp_queue
mp_queue = q
# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))
#This is getting called for every line to be proccesed async
def process_async_main(line):
print(line)
q.put(line + '_asynced')
line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()
while not q.empty():
print(q.get()) # This should be the inital line
推荐答案
好...我知道了...由于某些奇怪的原因, multiprocessing
无法在其中同步功能与同步代码相同的文件.
Ok... I´ve got it... For some strange reason multiprocessing
is not able to have the function to be asynced in the same file as the synchronized code.
编写如下代码:
asynced.py
##### Init async processes
def process_pool_init_per_process(q):
global mp_queue
mp_queue = q
##### Function to be asycned
def process_async_main(line):
print(line)
mp_queue.put(line + '_asynced')
而不是mp_process_example.py
:
import multiprocessing as mp
from asynced import process_async_main, process_pool_init_per_process
# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))
line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line,))
mp_resp = handler.get()
while not q.empty():
print(q.get()) # This should be the inital line + "_asynced"
按预期工作:
$ python3 mp_process_example.py
Hi, this is a test to test mp_queues with mp process pools
Hi, this is a test to test mp_queues with mp process pools_asynced
这篇关于python多处理进程池无法找到异步函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!