我已经实现了一种遗传搜索算法,并试图对其进行并行化,但是性能却很糟糕(比单线程还差)。我怀疑这是由于通信开销。
我在下面提供了伪代码,但是从本质上讲,遗传算法会创建大量的“染色体”对象,然后运行以下操作的许多迭代:
根据每个染色体在“世界”中的表现对其进行评分。在迭代过程中,世界保持静态。
根据上一步计算出的分数随机选择一个新的人口
转到步骤1进行n次迭代
计分算法(第1步)是主要瓶颈,因此分配此代码的处理似乎很自然。
我遇到了两个问题,希望可以得到帮助:
如何将计算出的分数与map()
传递给计分函数的对象链接,即将持有分数的每个Future
链接回Chromosome
?我通过让calculate_scores()
方法返回对象以非常笨拙的方式完成了此操作,但实际上,如果有更好的方法来维护链接,我所需要的只是发送回float
。
评分功能的并行处理可以正常工作,尽管map()
遍历所有对象都需要很长时间。但是,与单线程版本相比,随后对draw_chromosome_from_pool()
的调用运行非常缓慢,以至于我尚未看到它完成。我不知道是什么原因造成的,因为该方法在单线程版本中总是很快完成。即使所有期货都已完成,是否还有一些IPC正在将染色体拉回本地流程?是否以某种方式取消了本地流程的优先级?
我担心每个周期建立/重建池的整体迭代性质将导致大量数据传输给工作人员。这个问题根源的问题是:Dask什么时候以及什么时候实际将数据来回发送到工作池。即,什么时候Environment()和Chromosome()一起分发,结果如何/何时返回?我已经读过docs,但是要么找不到正确的细节,要么太愚蠢而无法理解。
理想情况下,我认为(但有待纠正)我想要的是一种分布式体系结构,其中每个工作人员均以“永久”方式在本地保存Environment()
数据,然后分配Chromosome()
实例数据以进行评分,几乎没有重复迭代之间保留不变的Chromosome()数据。
帖子很长,所以如果您花时间阅读本文,已经谢谢您了!
class Chromosome(object): # Small size: several hundred bytes per instance
def get_score():
# Returns a float
def set_score(i):
# Stores a a float
class Environment(object): # Large size: 20-50Mb per instance, but only one instance
def calculate_scores(chromosome):
# Slow calculation using attributes from chromosome and instance data
chromosome.set_score(x)
return chromosome
class Evolver(object):
def draw_chromosome_from_pool(self, max_score):
while True:
individual = np.random.choice(self.chromosome_pool)
selection_chance = np.random.uniform()
if selection_chance < individual.get_score() / max_score:
return individual
def run_evolution()
self.dask_client = Client()
self.chromosome_pool = list()
for i in range(10000):
self.chromosome_pool.append( Chromosome() )
world_data = LoadWorldData() # Returns a pandas Dataframe
self.world = Environment(world_data)
iterations = 1000
for i in range(iterations):
futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
for future in as_completed(futures):
c = future.result()
highest_score = max(highest_score, c.get_score())
new_pool = set()
while len(new_pool)<self.pool_size:
mother = self.draw_chromosome_from_pool(highest_score)
# do stuff to build a new pool
最佳答案
是的,每次您拨打电话时
futures = self.dask_client.map(self.world.calculate_scores, self.chromosome_pool)
您正在序列化
self.world
,它很大。您可以在循环之前执行一次future_world = client.scatter(self.world, broadcast=True)
然后在循环中
futures = self.dask_client.map(lambda ch: Environment.calculate_scores(future_world, ch), self.chromosome_pool)
将使用工作程序上已经存在的副本(或执行相同操作的简单函数)。关键是
future_world
只是指向已经分发的内容的指针,但是dask会为您解决这个问题。关于哪个染色体是哪个问题:使用
as_completed
会中断您将它们提交给map
的顺序,但这对于您的代码不是必需的。您可能已使用wait
处理所有工作的完成时间,或者仅对future.result()
进行迭代(这将等待每个任务完成),然后将顺序保留在chromosome_pool中。