我在并行分配功能时遇到问题。

问题陈述:我有2个坐标对列表,分别为dfCdfO。对于dfC中的每个obs,我计算有多少dfO落在半径r内。我目前有一个正常工作的功能,但我试图查看是否可以并行处理此功能。

问题是:dfC可以拆分并单独处理...但是每个工人的dfO必须为100%。我的方法是,让我先使其并行工作-然后再担心如何将dfO的完整副本分发给工作人员。除非有人可以帮助我解决这两个问题?

首先,下面是设置所有内容的代码:

import pandas as pd
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, process
import traceback
from scipy.spatial import cKDTree

# create 2 dataframes with random "coordinates"
dfC=pd.DataFrame(np.random.np.random.randint(0,100,size=(50,2)), columns=list('xy'))
dfO=pd.DataFrame(np.random.np.random.randint(0,100,size=(500,2)), columns=list('jk'))


这是dfC外观的示例,dfO看起来类似

+----+----+
|  x |  y |
+----+----+
| 35 |  5 |
+----+----+
| 96 | 18 |
+----+----+
| 23 | 25 |
+----+----+
| 20 | 7  |
+----+----+
| 74 | 54 |
+----+----+


接下来,这是像魅力一样起作用的功能。我不是故意单独传递所有参数,而是实际上是故意这样做的-准备主函数以并行方式调用它们(否则,我无法找到一种进行多处理的方法)。

# this function works on dfC, and adds a row which counts the number
# of objects in dfO which are within radius r
def worker_job(args):
    try:
        dfC, dfO, newcol, r = args

        mxC=dfC.as_matrix()
        mxO = dfO.as_matrix()

        # magic tree stuff
        C_Tree = cKDTree(mxC)
        O_Tree = cKDTree(mxO)

        listoflists = C_Tree.query_ball_tree(O_Tree, r, p=2.0, eps=0.0)

        counts=[]
        for i in listoflists:
            counts.append(len(i))

        s = pd.Series(counts)

        dfC[newcol] = s.values

    except:
        raise
        traceback.print_exc()
    else:
        return dfC


如果我这样创建参数:
args=[dfC,dfO,"new_column_name",3]

当我自己运行它时,它可以完美运行:
worker_job(args)

+----+----+-----------------+
|  x |  y | new_column_name |
+----+----+-----------------+
| 35 |  5 |        4        |
+----+----+-----------------+
| 96 | 18 |        1        |
+----+----+-----------------+
| 23 | 25 |        0        |
+----+----+-----------------+
| 20 |  7 |        1        |
+----+----+-----------------+
| 74 | 54 |        2        |
+----+----+-----------------+


现在,我尝试构建将控制并行工作程序并并行运行此程序的函数。这是我的最大努力:

# this function should control the multiprocessing
def Run_Parallel(Function, Num_Proc, args):
    try:
        pool = Pool(Num_Proc)
        parts = pool.map(Function,args)
        pool.close()
        pool.join()

        results_df = pd.concat(parts)

    except:
        pool.close()
        pool.terminate()
        traceback.print_exc()
    else:
        return results_df


它不会工作。 Run_Parallel(worker_job,2,args)引发有关ValueError: not enough values to unpack (expected 4, got 2)的错误。在通过包装程序时,该参数列表必定发生了某些事情。

我正在特别寻求有关此错误的指导,以及为知道如何解决更大问题的任何人的奖励积分-这就是我需要我的池中包含100%的dfO和仅为dfC的子集效率。

最佳答案

答案是将参数作为列表列表传递。这也解决了拆分数据帧的另一个问题(我认为默认情况下,pool会处理此问题,但事实并非如此)。

正确的功能应如下所示:

# this function should control the multiprocessing
def Run_Parallel(Function, Num_Proc, args):
    dfC, dfO, newcol, r = args

    # to make lists of lists
    argslist=[]
    dfOlist=[]
    dfClist=[]
    resultlist=[]

    # split dfC into parts
    Cparts=np.array_split(dfC, Num_Proc)

    # build the lists
    for i in range(Num_Proc):
        argslist.append([Cparts[i],dfO,newcol,r])


    try:
        pool = Pool(Num_Proc)
        parts = pool.map(Function,argslist)
        pool.close()
        pool.join()

        results_df = pd.concat(parts)

    except:
        pool.close()
        pool.terminate()
        traceback.print_exc()
    else:
        return results_df

08-16 17:02