我想创建一个函数,给定一个函数列表和相关参数,该函数将启动尽可能多的进程以并行化这些任务。正在运行的进程数不能超过我CPU的内核数。当一个过程完成时,应将其替换为另一个,直到结束。
我试图使用python池来实现这样的事情。这是我的功能:
from multiprocessing import Pool, cpu_count
CPUS = cpu_count()
def parallelize(functions, args):
results = []
if CPUS > 1:
for i in xrange(0, len(functions), CPUS):
pool = Pool()
for j in xrange(CPUS):
if i + j >= len(functions):
break
results.append(pool.apply_async(functions[i + j], args = args[i + j]))
pool.close()
pool.join()
map(lambda x: x.get(), results)
else:
for i in xrange(len(functions)):
results.append(functions[i](*args[i]))
return results
此实现将功能列表细分为大量。每个批量尺寸等于实际CPU的数量。问题在于,它实际上要等到每一个函数完成后,再重新启动另一个过程。
我不希望出现这种情况,因为如果批量中有一个非常慢的功能,另一个cpus将在启动新进程之前等待其完成。
正确的方法是什么?
最佳答案
看来您过于复杂了。 multiprocessing.Pool
将始终以您告诉它的确切数量的进程运行,而不管您赋予它多少工作项。因此,如果您创建Pool(CPUS)
,则即使您将Pool
任务提供给CPUS
任务,CPUS * 100
也永远不会同时运行多个apply_async
任务。因此,它满足了您的需求,即您无需执行任何特殊工作就永远不能运行超过CPU数量的任务。因此,您可以遍历整个方法和参数列表,并在它们上调用Pool
而不用担心完全批量处理这些调用。 CPUS
将一次执行所有任务,但一次最多执行个任务:
def parallelize(methods, args):
results = []
if CPUS > 1:
pool = Pool(CPUS)
for method, arg in zip(methods, args):
results.append(pool.apply_async(method, args=arg))
pool.close()
pool.join()
out = map(lambda x: x.get(), results)
else:
for i in xrange(len(methods)):
results.append(methods[i](*args[i]))
return results