我已经实现了一种遗传搜索算法,并试图对其进行并行化,但是性能却很糟糕(比单线程还差)。我怀疑这是由于通信开销。

我在下面提供了伪代码,但是从本质上讲,遗传算法会创建大量的“染色体”对象,然后运行以下操作的许多迭代:


根据每个染色体在“世界”中的表现对其进行评分。在迭代过程中,世界保持静态。
根据上一步计算出的分数随机选择一个新的人口
转到步骤1进行n次迭代


计分算法(第1步)是主要瓶颈,因此分配此代码的处理似乎很自然。

我遇到了两个问题,希望可以得到帮助:


如何将计算出的分数与map()传递给计分函数的对象链接,即将持有分数的每个Future链接回Chromosome?我通过让calculate_scores()方法返回对象以非常笨拙的方式完成了此操作,但实际上,如果有更好的方法来维护链接,我所需要的只是发送回float
评分功能的并行处理可以正常工作,尽管map()遍历所有对象都需要很长时间。但是,与单线程版本相比,随后对draw_chromosome_from_pool()的调用运行非常缓慢,以至于我尚未看到它完成。我不知道是什么原因造成的,因为该方法在单线程版本中总是很快完成。即使所有期货都已完成,是否还有一些IPC正在将染色体拉回本地流程?是否以某种方式取消了本地流程的优先级?
我担心每个周期建立/重建池的整体迭代性质将导致大量数据传输给工作人员。这个问题根源的问题是:Dask什么时候以及什么时候实际将数据来回发送到工作池。即,什么时候Environment()和Chromosome()一起分发,结果如何/何时返回?我已经读过docs,但是要么找不到正确的细节,要么太愚蠢而无法理解。


理想情况下,我认为(但有待纠正)我想要的是一种分布式体系结构,其中每个工作人员均以“永久”方式在本地保存Environment()数据,然后分配Chromosome()实例数据以进行评分,几乎没有重复迭代之间保留不变的Chromosome()数据。

帖子很长,所以如果您花时间阅读本文,已经谢谢您了!

class Chromosome(object):    # Small size: several hundred bytes per instance
     def get_score():
          # Returns a float
     def set_score(i):
          # Stores a a float

class Environment(object):   # Large size: 20-50Mb per instance, but only one instance
         def calculate_scores(chromosome):
             # Slow calculation using attributes from chromosome and instance data
             chromosome.set_score(x)
             return chromosome

class Evolver(object):
    def draw_chromosome_from_pool(self, max_score):
        while True:
            individual = np.random.choice(self.chromosome_pool)
            selection_chance = np.random.uniform()
            if selection_chance < individual.get_score() / max_score:
                return individual

    def run_evolution()
         self.dask_client = Client()
         self.chromosome_pool = list()
         for i in range(10000):
             self.chromosome_pool.append( Chromosome() )

         world_data = LoadWorldData() # Returns a pandas Dataframe
         self.world = Environment(world_data)

         iterations = 1000
         for i in range(iterations):
             futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
             for future in as_completed(futures):
                  c = future.result()
                  highest_score = max(highest_score, c.get_score())

             new_pool = set()
             while len(new_pool)<self.pool_size:
                 mother = self.draw_chromosome_from_pool(highest_score)
                  # do stuff to build a new pool

最佳答案

是的,每次您拨打电话时

futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)


您正在序列化self.world,它很大。您可以在循环之前执行一次

future_world = client.scatter(self.world, broadcast=True)


然后在循环中

futures = self.dask_client.map(lambda ch: Environment.calculate_scores(future_world, ch), self.chromosome_pool)


将使用工作程序上已经存在的副本(或执行相同操作的简单函数)。关键是future_world只是指向已经分发的内容的指针,但是dask会为您解决这个问题。

关于哪个染色体是哪个问题:使用as_completed会中断您将它们提交给map的顺序,但这对于您的代码不是必需的。您可能已使用wait处理所有工作的完成时间,或者仅对future.result()进行迭代(这将等待每个任务完成),然后将顺序保留在chromosome_pool中。

10-06 12:48