我想创建一个函数,给定一个函数列表和相关参数,该函数将启动尽可能多的进程以并行化这些任务。正在运行的进程数不能超过我CPU的内核数。当一个过程完成时,应将其替换为另一个,直到结束。

我试图使用python池来实现这样的事情。这是我的功能:

from multiprocessing import Pool, cpu_count

CPUS = cpu_count()

def parallelize(functions, args):
    results = []
    if CPUS > 1:
        for i in xrange(0, len(functions), CPUS):
            pool = Pool()
            for j in xrange(CPUS):
                if i + j >= len(functions):
                    break
                results.append(pool.apply_async(functions[i + j], args = args[i + j]))
            pool.close()
            pool.join()
        map(lambda x: x.get(), results)
    else:
        for i in xrange(len(functions)):
            results.append(functions[i](*args[i]))
    return results


此实现将功能列表细分为大量。每个批量尺寸等于实际CPU的数量。问题在于,它实际上要等到每一个函数完成后,再重新启动另一个过程。
我不希望出现这种情况,因为如果批量中有一个非常慢的功能,另一个cpus将在启动新进程之前等待其完成。

正确的方法是什么?

最佳答案

看来您过于复杂了。 multiprocessing.Pool将始终以您告诉它的确切数量的进程运行,而不管您赋予它多少工作项。因此,如果您创建Pool(CPUS),则即使您将Pool任务提供给CPUS任务,CPUS * 100也永远不会同时运行多个apply_async任务。因此,它满足了您的需求,即您无需执行任何特殊工作就永远不能运行超过CPU数量的任务。因此,您可以遍历整个方法和参数列表,并在它们上调用Pool而不用担心完全批量处理这些调用。 CPUS将一次执行所有任务,但一次最多执行个任务:

def parallelize(methods, args):
    results = []
    if CPUS > 1:
        pool = Pool(CPUS)
        for method, arg in zip(methods, args):
            results.append(pool.apply_async(method, args=arg))
        pool.close()
        pool.join()
        out = map(lambda x: x.get(), results)
    else:
        for i in xrange(len(methods)):
            results.append(methods[i](*args[i]))
    return results

09-05 23:13