本文介绍了具有多个 args 和 kwargs 的函数的 Multiprocessing.pool的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 mutliprocessing.pool 方法并行计算.问题是我想在计算中使用的函数有两个 args 和可选的 kwargs,第一个参数是数据帧,第二个参数是 str,任何 kwargs 是字典.

I would like to parallelise a calculation using the mutliprocessing.pool method. The problem is that the function I would like to use in the calculation presents two args and optional kwargs, being the first argument a dataframe, the second one a str and any kwargs a dictionary.

对于我尝试执行的所有计算,我想要使用的数据框和字典都是相同的,只是第二个不断变化的参数.因此,我希望能够使用 map 方法将它作为不同字符串的列表传递给已经使用 df 和 dict 打包的函数.

Both the dataframe and the dictionary I want to use are the same for all the calculations I am trying to carry out, being only the second arg the one that keeps changing. I was therefore hoping to be able to pass it as a list of different strings using the map method to the already packed function with the df and dict.

from utils import *
import multiprocessing
from functools import partial



def sumifs(df, result_col, **kwargs):

    compare_cols = list(kwargs.keys())
    operators = {}
    for col in compare_cols:
        if type(kwargs[col]) == tuple:
            operators[col] = kwargs[col][0]
            kwargs[col] = list(kwargs[col][1])
        else:
            operators[col] = operator.eq
            kwargs[col] = list(kwargs[col])
    result = []
    cache = {}
    # Go through each value
    for i in range(len(kwargs[compare_cols[0]])):
        compare_values = [kwargs[col][i] for col in compare_cols]
        cache_key = ','.join([str(s) for s in compare_values])
        if (cache_key in cache):
            entry = cache[cache_key]
        else:
            df_copy = df.copy()
            for compare_col, compare_value in zip(compare_cols, compare_values):
                df_copy = df_copy.loc[operators[compare_col](df_copy[compare_col], compare_value)]
            entry = df_copy[result_col].sum()
            cache[cache_key] = entry
        result.append(entry)
    return pd.Series(result)

if __name__ == '__main__':

    ca = read_in_table('Tab1')
    total_consumer_ids = len(ca)

    base = pd.DataFrame()
    base['ID'] = range(1, total_consumer_ids + 1)


    result_col= ['A', 'B', 'C']
    keywords = {'Z': base['Consumer archetype ID']}

    max_number_processes = multiprocessing.cpu_count()
    with multiprocessing.Pool(processes=max_number_processes) as pool:
        results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
    print(results)

但是,当我运行上面的代码时,我收到以下错误:TypeError: sumifs() missing 1 required positional argument: 'result_col'.我如何为函数提供第一个 arg 和 kwargs,同时提供第二个参数作为 str 列表,以便我可以并行计算?我在论坛中阅读了几个类似的问题,但似乎没有一个解决方案适用于这种情况......

However, when I run the code above I get the following error: TypeError: sumifs() missing 1 required positional argument: 'result_col'. How could I provide the function with the first arg and kwargs, while providing the second argument as a list of str so I can paralelise the calculation? I have read several similar questions in the forum but none of the solutions seem to work for this case...

谢谢,如果有什么不明白的地方,我很抱歉,我今天才知道多处理包!

Thank you and apologies if something is not clear, I just learnt of the multiprocessing package today!

推荐答案

让我们看看你的代码的两部分.

Let's have a look at two part of your code.

首先是sumifs函数声明:

def sumifs(df, result_col, **kwargs):

其次,使用相关参数调用此函数.

Secondly, the call to this function with the relevant parameters.

# Those are the params
ca = read_in_table('Tab1')
keywords = {'Z': base['Consumer archetype ID']}

# This is the function call
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)

更新 1:

修改原代码后,看来是位置参数赋值的问题,尽量舍弃.


Update 1:

After the original code has been edited.It look like the problem is the positional argument assignment, try to discard it.

替换行:

results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)

与:

results = pool.map(partial(sumifs, ca, **keywords), result_col)

示例代码:

import multiprocessing
from functools import partial

def test_func(arg1, arg2, **kwargs):
    print(arg1)
    print(arg2)
    print(kwargs)
    return arg2

if __name__ == '__main__':
    list_of_args2 = [1, 2, 3]
    just_a_dict = {'key1': 'Some value'}
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(partial(test_func, 'This is arg1', **just_a_dict), list_of_args2)
    print(results)

将输出:

This is arg1
1
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
['1', '2', '3']

关于如何Multiprocessing.pool 具有多个 args 和 kwargs 的函数

扩展示例(由于评论):

Extended example (due to comments):

然而,我想知道,如果我的函数有三个 args 和 kwargs,并且我想保持 arg1、arg3 和 kwargs 不变,我怎么能将 arg2 作为多处理列表传递?本质上,我将如何指示映射的多处理(partial(test_func, 'This is arg1', 'This would be arg3', **just_a_dict), arg2) 部分中的第二个值对应于 arg3 而不是 arg2?

更新 1 代码将更改如下:

# The function signature
def test_func(arg1, arg2, arg3, **kwargs):

# The map call
pool.map(partial(test_func, 'This is arg1', arg3='This is arg3', **just_a_dict), list_of_args2)

这可以使用 python 位置和关键字赋值来完成.请注意,尽管 kwargs 位于关键字分配的值之后,但它被放在一边并且没有使用 关键字 进行分配.

This can be done using the python positional and keyword assignment.Note that the kwargs is left aside and not assigned using a keyword despite the fact that it's located after a keyword assigned value.

可以在此处找到有关参数分配差异的更多信息.

More information about argument assignment differences can be found here.

这篇关于具有多个 args 和 kwargs 的函数的 Multiprocessing.pool的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 18:30