问题描述
受到此解决方案的启发,我正在尝试在Python中建立工作进程的多处理池.这个想法是在工作程序实际开始工作之前将一些数据传递给工作程序,并最终重新使用它们.旨在最大程度减少每次调用工作进程所需的打包数据量(即,减少进程间通信开销).我的 MCVE 看起来像这样:
Inspired by this solution I am trying to set up a multiprocessing pool of worker processes in Python. The idea is to pass some data to the worker processes before they actually start their work and reuse it eventually. It's intended to minimize the amount of data which needs to be packed/unpacked for every call into a worker process (i.e. reducing inter-process communication overhead). My MCVE looks like this:
import multiprocessing as mp
import numpy as np
def create_worker_context():
global context # create "global" context in worker process
context = {}
def init_worker_context(worker_id, some_const_array, DIMS, DTYPE):
context.update({
'worker_id': worker_id,
'some_const_array': some_const_array,
'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE),
}) # store context information in global namespace of worker
return True # return True, verifying that the worker process received its data
class data_analysis:
def __init__(self):
self.DTYPE = 'float32'
self.CPU_LEN = mp.cpu_count()
self.DIMS = 100
self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE)
# Init multiprocessing pool
self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers
pool_results = [
self.cpu_pool.apply_async(
init_worker_context,
args = (core_id, self.some_const_array, self.DIMS, self.DTYPE)
) for core_id in range(self.CPU_LEN)
] # pass information to workers' context
result_batches = [result.get() for result in pool_results] # check if they got the information
if not all(result_batches): # raise an error if things did not work
raise SyntaxError('Workers could not be initialized ...')
@staticmethod
def process_batch(batch_data):
context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
return context['tmp'] # return result
def process_all(self):
input_data = np.arange(0, self.DIMS ** 2, dtype = self.DTYPE).reshape(self.DIMS, self.DIMS)
pool_results = [
self.cpu_pool.apply_async(
data_analysis.process_batch,
args = (input_data,)
) for _ in range(self.CPU_LEN)
] # let workers actually work
result_batches = [result.get() for result in pool_results]
for batch in result_batches[1:]:
np.add(result_batches[0], batch, out = result_batches[0]) # reduce batches
print(result_batches[0]) # show result
if __name__ == '__main__':
data_analysis().process_all()
我正在CPython 3.6.6上运行以上代码.
I am running the above with CPython 3.6.6.
奇怪的是……有时它起作用,有时却不起作用.如果它不起作用,则process_batch
方法将引发异常,因为它无法在context
中找到some_const_array
作为键.完整的回溯看起来像这样:
The strange thing is ... sometimes it works, sometimes it does not. If it does not work, the process_batch
method throws an exception, because it can not find some_const_array
as a key in context
. The full traceback looks like this:
(env) me@box:/path> python so.py
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/path/so.py", line 37, in process_batch
context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
KeyError: 'some_const_array'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/so.py", line 54, in <module>
data_analysis().process_all()
File "/path/so.py", line 48, in process_all
result_batches = [result.get() for result in pool_results]
File "/path/so.py", line 48, in <listcomp>
result_batches = [result.get() for result in pool_results]
File "/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
KeyError: 'some_const_array'
我很困惑.这里发生了什么?
I am puzzled. What is going on here?
如果我的context
词典包含较高类型"的对象,例如数据库驱动程序或类似驱动程序,我没有遇到这种问题.如果我的context
字典包含基本的Python数据类型,集合或numpy数组,则只能重现此内容.
If my context
dictionaries contain an object of "higher type", e.g. a database driver or similar, I am not getting this kind of problem. I can only reproduce this if my context
dictionaries contain basic Python data types, collections or numpy arrays.
(是否有可能以更可靠的方式实现相同目标的更好方法?我知道我的方法被认为是 hack ...)
(Is there a potentially better approach for achieving the same thing in a more reliable manner? I know my approach is considered a hack ...)
推荐答案
您需要将init_worker_context
的内容重新定位到initializer
函数create_worker_context
中.
You need to relocate the content of init_worker_context
into your initializer
function create_worker_context
.
您认为每一个工作者进程将运行init_worker_context
的假设是造成您困惑的原因.您提交给池的任务将被馈送到所有读取的工作进程的一个内部任务队列中.在您的情况下发生的事情是,某些工作进程完成了他们的任务,并再次竞争以获取新任务.这样一来,一个工作进程可能会执行多个任务,而另一个工作进程却无法完成一个任务,这很可能发生.请记住,操作系统为(工作进程的)线程调度运行时.
Your assumption that every single worker process will run init_worker_context
is responsible for your confusion.The tasks you submit to a pool get fed into one internal taskqueue all worker processes read from. What happens in your case is, that some worker processes complete their task and compete again for getting new tasks. So it can happen that one worker processes will execute multiple tasks while another one will not get a single one. Keep in mind the OS schedules runtime for threads (of the worker processes).
这篇关于尝试访问多处理中的持久性数据时获得不稳定的运行时异常.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!