我是the futures module的新手,有一项可以从并行化中受益的任务;但是我似乎无法确切地知道如何为线程设置函数以及为进程设置函数。任何人都可以在此问题上提供的帮助,我将不胜感激。

我正在运行particle swarm optimization (PSO)。在不深入了解PSO本身的情况下,以下是我的代码的基本布局:

有一个Particle类,带有getFitness(self)方法(该方法计算一些度量并将其存储在self.fitness中)。一个PSO仿真具有多个粒子实例(对于某些仿真,很容易超过10个; 100s甚至1000s)。
每隔一段时间,我必须计算粒子的适合度。目前,我在for循环中执行此操作:

for p in listOfParticles:
  p.getFitness(args)

但是,我注意到每个粒子的适应度可以彼此独立地计算。这使得适合度计算成为并行化的主要候选者。确实,我可以做map(lambda p: p.getFitness(args), listOfParticles)

现在,我可以使用futures.ProcessPoolExecutor轻松地做到这一点:
with futures.ProcessPoolExecutor() as e:
  e.map(lambda p: p.getFitness(args), listOfParticles)

由于调用p.getFitness的副作用存储在每个粒子本身中,因此我不必担心会从futures.ProcessPoolExecutor()获得返回。

到目前为止,一切都很好。但是现在我注意到ProcessPoolExecutor创建了新进程,这意味着它复制了内存,这很慢。我希望能够共享内存-因此我应该使用线程。很好,直到我意识到运行多个进程并在每个进程中包含多个线程可能会更快,因为多个线程仍然仅在我的8核机器上的一个处理器上运行。

这是我遇到麻烦的地方:
根据我所看到的示例,ThreadPoolExecutorlist上运行。 ProcessPoolExecutor也是如此。因此,我无法对ProcessPoolExecutor进行任何迭代操作以将其移植到ThreadPoolExecutor,因为ThreadPoolExecutor将使单个对象可以工作(请参阅下面的尝试)。
另一方面,我不能自己切片listOfParticles,因为我希望ThreadPoolExecutor自己做魔术,以找出需要多少个线程。

所以,这个大问题(终于结束了):
我应该如何构造我的代码,以便可以同时使用两个进程和线程来并行执行以下操作:
for p in listOfParticles:
  p.getFitness()

这是我一直在尝试的方法,但是我不敢尝试运行它,因为我知道它不起作用:
>>> def threadize(func, L, mw):
...     with futures.ThreadpoolExecutor(max_workers=mw) as executor:
...             for i in L:
...                     executor.submit(func, i)
...

>>> def processize(func, L, mw):
...     with futures.ProcessPoolExecutor() as executor:
...             executor.map(lambda i: threadize(func, i, mw), L)
...

对于如何解决此问题,甚至对如何改进我的方法,我将不胜感激

万一重要,我在python3.3.2上

最佳答案

我将为您提供将进程与线程混合在一起以解决问题的工作代码,但这不是您所期望的;-)首先要做的是制作一个不会危害您真实数据的模拟程序。尝试一些无害的东西。所以这是开始:

class Particle:
    def __init__(self, i):
        self.i = i
        self.fitness = None
    def getfitness(self):
        self.fitness = 2 * self.i

现在我们可以玩些东西了。接下来的一些常量:
MAX_PROCESSES = 3
MAX_THREADS = 2 # per process
CHUNKSIZE = 100

摆弄那些尝尝。 CHUNKSIZE将在后面说明。

给您的第一个惊喜是我最低级别的worker函数的功能。那是因为您在这里过于乐观:



,在辅助进程中完成的任何操作都不会对您的主程序中的Particle实例产生任何影响。工作进程可以通过Particle的写时复制实现方式来处理fork()实例的副本,或者因为它正在处理通过解开跨进程传递的Particle泡菜而制成的副本。

因此,如果您希望主程序查看适应性结果,则需要安排将信息发送回主程序。因为我对您的实际程序不了解,所以在这里我假设Particle().i是一个唯一的整数,并且主程序可以轻松地将整数映射回Particle实例。考虑到这一点,这里最低级别的worker函数需要返回一个对:唯一的整数和适应性结果:
def thread_worker(p):
    p.getfitness()
    return (p.i, p.fitness)

鉴于此,很容易在线程之间散布Particle的列表,并返回(particle_id, fitness)结果的列表:
def proc_worker(ps):
    import concurrent.futures as cf
    with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
        result = list(e.map(thread_worker, ps))
    return result

笔记:
  • 这是每个工作进程将运行的功能。
  • 我使用的是Python 3,因此请使用list()强制e.map()实现列表中的所有结果。
  • 如评论中所述,在CPython下,将CPU绑定(bind)的任务分散到各个线程中要比在单个线程中全部完成慢。

  • 只需编写代码即可在整个进程中散布Particle列表,然后检索结果。使用multiprocessing很难做到这一点,所以这就是我要使用的。我不知道concurrent.futures是否可以做到(假设我们也在线程中混合使用),但不在乎。但是,因为我正在为您提供工作代码,所以您可以使用它并进行报告;-)
    if __name__ == "__main__":
        import multiprocessing
    
        particles = [Particle(i) for i in range(100000)]
        # Note the code below relies on that particles[i].i == i
        assert all(particles[i].i == i for i in range(len(particles)))
    
        pool = multiprocessing.Pool(MAX_PROCESSES)
        for result_list in pool.imap_unordered(proc_worker,
                          (particles[i: i+CHUNKSIZE]
                           for i in range(0, len(particles), CHUNKSIZE))):
            for i, fitness in result_list:
                particles[i].fitness = fitness
    
        pool.close()
        pool.join()
    
        assert all(p.fitness == 2*p.i for p in particles)
    

    笔记:
  • 我要“手工”将Particle的列表分成多个块。那就是CHUNKSIZE的目的。那是因为一个工作进程想要一个Particle列表来工作,而这又是futures map()函数想要的。无论如何都要分块工作是一个好主意,因此您会得到一些实实在在的返回,以换取每次调用时的进程间开销。
  • imap_unordered()不保证返回结果的顺序。这为实现提供了更大的自由,以便尽可能高效地安排工作。而且我们不在乎这里的顺序,所以很好。
  • 请注意,循环将检索(particle_id, fitness)结果,并相应地修改Particle实例。也许您真正的.getfitnessParticle实例进行了其他修改-无法猜测。无论如何,主程序将永远不会看到“通过魔术”对 worker 造成的任何突变-您必须明确地安排它。在限制范围内,您可以改为返回(particle_id, particle_instance)对,并替换主程序中的Particle实例。然后,它们将反射(reflect)出工作流程中发生的所有变异。

  • 玩得开心 :-)

    future 一路下跌

    原来,替换multiprocessing非常容易。这是更改。这也(如前所述)替换了原始的Particle实例,以便捕获所有突变。但是,这里有一个折衷:腌制一个实例比腌制一个“适合”结果需要“更多”的字节。更多的网络流量。选择你的毒药;-)

    返回变异的实例只需要替换thread_worker()的最后一行,就像这样:
    return (p.i, p)
    

    然后,用以下内容替换所有“ main ”块:
    def update_fitness():
        import concurrent.futures as cf
        with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
            for result_list in e.map(proc_worker,
                          (particles[i: i+CHUNKSIZE]
                           for i in range(0, len(particles), CHUNKSIZE))):
                for i, p in result_list:
                    particles[i] = p
    
    if __name__ == "__main__":
        particles = [Particle(i) for i in range(500000)]
        assert all(particles[i].i == i for i in range(len(particles)))
    
        update_fitness()
    
        assert all(particles[i].i == i for i in range(len(particles)))
        assert all(p.fitness == 2*p.i for p in particles)
    

    该代码与multiprocessor舞蹈非常相似。就个人而言,我会使用multiprocessing版本,因为imap_unordered很有值(value)。这是简化界面的问题:他们通常以隐藏有用的可能性为代价来购买简单性。

    关于python - ProcessPoolExecutor内部的ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/19994478/

    10-11 18:44