语境
我正在使用类似如下的过程从Beam中的Google Storage中读取文件:
data = pipeline | beam.Create(['gs://my/file.pkl']) | beam.ParDo(LoadFileDoFn)
LoadFileDoFn
在其中加载文件并从中创建对象的Python列表,然后ParDo
作为PCollection
返回。我知道我可能可以实现一个自定义源来实现类似的目的,但是this answer和Beam的own documentation表明,这种通过
ParDo
读取伪数据集的方法并不罕见,并且自定义源可能会过大。它也可以工作-我得到了带有正确数量元素的
PCollection
,可以根据自己的喜好进行处理!然而..自动缩放问题
生成的
PCollection
在Cloud Dataflow上根本不会自动缩放。我首先必须通过以下方式对其进行转换:shuffled_data = data | beam.Shuffle()
我知道我也在上面链接的this answer解释了此过程的大部分内容-但它没有提供任何有关为什么这样做的必要见解。就Beam的高度抽象而言,我在混洗之前有一个带有N个元素的PCollection
,在混洗之后有一个相似的PCollection
。为什么一个缩放,而另一个不缩放?在这种情况下(或通常来说,文档不是很有帮助),但这是另一回事。第一个
PCollection
具有什么隐藏属性,可以防止将其分配给其他人没有的多个工作程序? 最佳答案
当您通过Create阅读时,您正在创建绑定(bind)到1个worker的PCollection。由于没有与项目关联的键,因此没有分配工作的机制。 Shuffle()将在封面下创建一个K,V,然后随机播放,这使PCollection项目可以在旋转时分配给新 worker 。您可以通过关闭自动缩放功能并将工作人员的大小固定为25来验证此行为-如果没有随机播放,您只会看到1个工作人员在工作。
创建/读取时分发此工作的另一种方法是构建您自己的自定义I/O,以读取PKL文件1。您将创建适当的拆分器;但是,不知道您腌制了什么,可能无法拆分。 IMO Shuffle()是一个安全的选择,您可以通过编写可拆分阅读器来优化获得的模数。
关于python - 为什么我需要改组PCollection才能在Cloud Dataflow上自动缩放?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53885306/