考虑到键/值对的数据源相对较小(3,000-10,000),我试图仅处理满足组阈值(50-100)的记录。因此,最简单的方法是通过键,过滤和展开将它们分组-使用FlatMap或ParDo。到目前为止,最大的组只有1,500个记录。但这似乎是Google Cloud Dataflow生产中的严重瓶颈。

有给定清单

(1,1)
(1、2)
(1,3)
...
(2,1)
(2,2)
(2、3)
...

运行一组转换以按键过滤和分组:

p | 'Group' >> beam.GroupByKey()
  | 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50)
  | 'Unwind' >> beam.FlatMap(lambda (key, values): values)


关于如何提高性能的任何想法?谢谢你的帮助!

最佳答案

对于管道来说,这是一个有趣的特殊情况。我认为您的问题在于读取GroupByKey数据的方式。让我简要介绍一下GBK的工作原理。

什么是GroupByKey,以及大数据系统如何实现

所有大数据系统都实现了在同一键的多个元素上实现操作的方式。在MapReduce中,此方法称为reduce;在其他大数据系统中,此方法称为Group By Key或Combine。

进行GroupByKey转换时,Dataflow需要将单个键的所有元素收集到同一台计算机中。由于同一密钥的不同元素可能在不同的机器中处理,因此需要以某种方式序列化数据。

这意味着,当您读取来自GroupByKey的数据时,您正在访问工作线程的IO(即,不是从内存中),因此,您确实要避免过多地读取随机数据。

这如何转化为您的管道

我相信这里的问题是FilterUnwind都将分别从shuffle读取数据(因此您将两次读取每个列表的数据)。您要做的就是只读取一次随机播放数据。您可以使用单个FlatMap来执行此操作,该过滤器可以过滤和展开数据,而无需从shuffle中读取数据。像这样:

def unwind_and_filter((key, values)):
  # This consumes all the data from shuffle
  value_list = list(values)
  if len(value_list) > 50:
    yield value_list

p | 'Group' >> beam.GroupByKey()
  | 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter)


让我知道是否有帮助。

10-02 11:01