下面是我正在使用的代码的一个简化版本:我有一个python类,它带有一个实例方法,该方法接受一个字符串列表,并为每个字符串计算一个结果,最终在返回之前将结果合并,如下所示:
class Foo(object):
def do_task(stringList):
for s in stringList:
result = computeResult(s)
# combine results below...
由于使用字符串的计算都是独立的(而且相当昂贵),所以我尝试将该操作与多处理模块中的Pool类并行。因此,我定义了一个并行版本的
do_task
如下(我目前只是打印单独的结果,而不是组合它们):def do_task_parallel(stringList):
numProcs = 2
pool = Pool(processes=numProcs)
chunksize = int(math.ceil(len(stringList) / float(numProcs)))
for result in pool.imap(self.do_task, stringList, chunksize):
print result
pool.close()
根据我对Pool如何工作的理解(基于我阅读的文档和示例),这应该将stringList iterable分成大小大致为chunkSize的块,每个块作为任务提交给Pool中的一个进程。因此,如果我有一个列表
stringList = ["foo1", "foo2", "foo3", "foo4"]
被分成两个进程(给出块大小为2),pool应该将其分成stringList1 = ["foo1", "foo2"]
和stringList2 = ["foo3", "foo4"]
,这将由两个不同的进程并行处理。然而,当我创建一个Foo()对象并调用
foo.do_task_parallel(stringList)
时,池似乎正在将我的stringList
中的每个元素分别传递给do_task
(作为一个块的一部分)。这不仅不会加速我的代码,而且会使代码不正确,实际上会使代码变慢,因为do_task
然后对四个单独调用中每个传入的一个输入字符串的每个字符调用computeResult
。我期望两个调用,每个调用处理一个大小为2的输入列表,而不是四个调用处理一个输入字符串。我查过了,chunksize
确实是2。我做错什么了?如果有帮助的话,我将通过cygwin在Windows 7上运行python 2.7.3。 最佳答案
您的理解是关闭的;-)chunksize
纯粹是一个可选的优化:它不会更改传递给辅助函数的内容,它只会向multiprocessing
机器提示一次要通过内部进程间管道发送多少任务。
如果希望向工作函数传递字符串列表,则必须显式编码该字符串。例如,为了清晰起见,将其粘贴在多行上:
chunks = [stringList[i: i+chunksize]
for i in xrange(0, len(stringList), chunksize)]
for result in pool.imap(self.do_task, chunks):
print result
关于python - Python多处理池未正确分块,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/20170094/