下面是我正在使用的代码的一个简化版本:我有一个python类,它带有一个实例方法,该方法接受一个字符串列表,并为每个字符串计算一个结果,最终在返回之前将结果合并,如下所示:

class Foo(object):
    def do_task(stringList):
        for s in stringList:
            result = computeResult(s)
        # combine results below...

由于使用字符串的计算都是独立的(而且相当昂贵),所以我尝试将该操作与多处理模块中的Pool类并行。因此,我定义了一个并行版本的do_task如下(我目前只是打印单独的结果,而不是组合它们):
def do_task_parallel(stringList):
    numProcs = 2
    pool = Pool(processes=numProcs)
    chunksize = int(math.ceil(len(stringList) / float(numProcs)))
    for result in pool.imap(self.do_task, stringList, chunksize):
        print result
    pool.close()

根据我对Pool如何工作的理解(基于我阅读的文档和示例),这应该将stringList iterable分成大小大致为chunkSize的块,每个块作为任务提交给Pool中的一个进程。因此,如果我有一个列表stringList = ["foo1", "foo2", "foo3", "foo4"]被分成两个进程(给出块大小为2),pool应该将其分成stringList1 = ["foo1", "foo2"]stringList2 = ["foo3", "foo4"],这将由两个不同的进程并行处理。
然而,当我创建一个Foo()对象并调用foo.do_task_parallel(stringList)时,池似乎正在将我的stringList中的每个元素分别传递给do_task(作为一个块的一部分)。这不仅不会加速我的代码,而且会使代码不正确,实际上会使代码变慢,因为do_task然后对四个单独调用中每个传入的一个输入字符串的每个字符调用computeResult。我期望两个调用,每个调用处理一个大小为2的输入列表,而不是四个调用处理一个输入字符串。我查过了,chunksize确实是2。我做错什么了?如果有帮助的话,我将通过cygwin在Windows 7上运行python 2.7.3。

最佳答案

您的理解是关闭的;-)chunksize纯粹是一个可选的优化:它不会更改传递给辅助函数的内容,它只会向multiprocessing机器提示一次要通过内部进程间管道发送多少任务。
如果希望向工作函数传递字符串列表,则必须显式编码该字符串。例如,为了清晰起见,将其粘贴在多行上:

chunks = [stringList[i: i+chunksize]
          for i in xrange(0, len(stringList), chunksize)]

for result in pool.imap(self.do_task, chunks):
    print result

关于python - Python多处理池未正确分块,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/20170094/

10-11 07:47