问题描述
我想使用 mutliprocessing.pool 方法并行计算.问题是我想在计算中使用的函数有两个 args 和可选的 kwargs,第一个参数是数据帧,第二个参数是 str,任何 kwargs 是字典.
I would like to parallelise a calculation using the mutliprocessing.pool method. The problem is that the function I would like to use in the calculation presents two args and optional kwargs, being the first argument a dataframe, the second one a str and any kwargs a dictionary.
对于我尝试执行的所有计算,我想要使用的数据框和字典都是相同的,只是第二个不断变化的参数.因此,我希望能够使用 map 方法将它作为不同字符串的列表传递给已经使用 df 和 dict 打包的函数.
Both the dataframe and the dictionary I want to use are the same for all the calculations I am trying to carry out, being only the second arg the one that keeps changing. I was therefore hoping to be able to pass it as a list of different strings using the map method to the already packed function with the df and dict.
from utils import *
import multiprocessing
from functools import partial
def sumifs(df, result_col, **kwargs):
compare_cols = list(kwargs.keys())
operators = {}
for col in compare_cols:
if type(kwargs[col]) == tuple:
operators[col] = kwargs[col][0]
kwargs[col] = list(kwargs[col][1])
else:
operators[col] = operator.eq
kwargs[col] = list(kwargs[col])
result = []
cache = {}
# Go through each value
for i in range(len(kwargs[compare_cols[0]])):
compare_values = [kwargs[col][i] for col in compare_cols]
cache_key = ','.join([str(s) for s in compare_values])
if (cache_key in cache):
entry = cache[cache_key]
else:
df_copy = df.copy()
for compare_col, compare_value in zip(compare_cols, compare_values):
df_copy = df_copy.loc[operators[compare_col](df_copy[compare_col], compare_value)]
entry = df_copy[result_col].sum()
cache[cache_key] = entry
result.append(entry)
return pd.Series(result)
if __name__ == '__main__':
ca = read_in_table('Tab1')
total_consumer_ids = len(ca)
base = pd.DataFrame()
base['ID'] = range(1, total_consumer_ids + 1)
result_col= ['A', 'B', 'C']
keywords = {'Z': base['Consumer archetype ID']}
max_number_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=max_number_processes) as pool:
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
print(results)
但是,当我运行上面的代码时,我收到以下错误:TypeError: sumifs() missing 1 required positional argument: 'result_col'
.我如何为函数提供第一个 arg 和 kwargs,同时提供第二个参数作为 str 列表,以便我可以并行计算?我在论坛中阅读了几个类似的问题,但似乎没有一个解决方案适用于这种情况......
However, when I run the code above I get the following error: TypeError: sumifs() missing 1 required positional argument: 'result_col'
. How could I provide the function with the first arg and kwargs, while providing the second argument as a list of str so I can paralelise the calculation? I have read several similar questions in the forum but none of the solutions seem to work for this case...
谢谢,如果有什么不明白的地方,我很抱歉,我今天才知道多处理包!
Thank you and apologies if something is not clear, I just learnt of the multiprocessing package today!
推荐答案
让我们看看你的代码的两部分.
Let's have a look at two part of your code.
首先是sumifs
函数声明:
def sumifs(df, result_col, **kwargs):
其次,使用相关参数调用此函数.
Secondly, the call to this function with the relevant parameters.
# Those are the params
ca = read_in_table('Tab1')
keywords = {'Z': base['Consumer archetype ID']}
# This is the function call
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)
更新 1:
修改原代码后,看来是位置参数赋值的问题,尽量舍弃.
Update 1:
After the original code has been edited.It look like the problem is the positional argument assignment, try to discard it.
替换行:
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
与:
results = pool.map(partial(sumifs, ca, **keywords), result_col)
示例代码:
import multiprocessing
from functools import partial
def test_func(arg1, arg2, **kwargs):
print(arg1)
print(arg2)
print(kwargs)
return arg2
if __name__ == '__main__':
list_of_args2 = [1, 2, 3]
just_a_dict = {'key1': 'Some value'}
with multiprocessing.Pool(processes=3) as pool:
results = pool.map(partial(test_func, 'This is arg1', **just_a_dict), list_of_args2)
print(results)
将输出:
This is arg1
1
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
['1', '2', '3']
关于如何Multiprocessing.pool 具有多个 args 和 kwargs 的函数
扩展示例(由于评论):
Extended example (due to comments):
然而,我想知道,如果我的函数有三个 args 和 kwargs,并且我想保持 arg1、arg3 和 kwargs 不变,我怎么能将 arg2 作为多处理列表传递?本质上,我将如何指示映射的多处理(partial(test_func, 'This is arg1', 'This would be arg3', **just_a_dict), arg2) 部分中的第二个值对应于 arg3 而不是 arg2?
更新 1 代码将更改如下:
# The function signature
def test_func(arg1, arg2, arg3, **kwargs):
# The map call
pool.map(partial(test_func, 'This is arg1', arg3='This is arg3', **just_a_dict), list_of_args2)
这可以使用 python 位置和关键字赋值来完成.请注意,尽管 kwargs
位于关键字分配的值之后,但它被放在一边并且没有使用 关键字 进行分配.
This can be done using the python positional and keyword assignment.Note that the kwargs
is left aside and not assigned using a keyword despite the fact that it's located after a keyword assigned value.
可以在此处找到有关参数分配差异的更多信息.
More information about argument assignment differences can be found here.
这篇关于具有多个 args 和 kwargs 的函数的 Multiprocessing.pool的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!