考虑到键/值对的数据源相对较小(3,000-10,000),我试图仅处理满足组阈值(50-100)的记录。因此,最简单的方法是通过键,过滤和展开将它们分组-使用FlatMap或ParDo。到目前为止,最大的组只有1,500个记录。但这似乎是Google Cloud Dataflow生产中的严重瓶颈。
有给定清单
(1,1)
(1、2)
(1,3)
...
(2,1)
(2,2)
(2、3)
...
运行一组转换以按键过滤和分组:
p | 'Group' >> beam.GroupByKey()
| 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50)
| 'Unwind' >> beam.FlatMap(lambda (key, values): values)
关于如何提高性能的任何想法?谢谢你的帮助!
最佳答案
对于管道来说,这是一个有趣的特殊情况。我认为您的问题在于读取GroupByKey
数据的方式。让我简要介绍一下GBK的工作原理。
什么是GroupByKey
,以及大数据系统如何实现
所有大数据系统都实现了在同一键的多个元素上实现操作的方式。在MapReduce中,此方法称为reduce;在其他大数据系统中,此方法称为Group By Key或Combine。
进行GroupByKey
转换时,Dataflow需要将单个键的所有元素收集到同一台计算机中。由于同一密钥的不同元素可能在不同的机器中处理,因此需要以某种方式序列化数据。
这意味着,当您读取来自GroupByKey
的数据时,您正在访问工作线程的IO(即,不是从内存中),因此,您确实要避免过多地读取随机数据。
这如何转化为您的管道
我相信这里的问题是Filter
和Unwind
都将分别从shuffle读取数据(因此您将两次读取每个列表的数据)。您要做的就是只读取一次随机播放数据。您可以使用单个FlatMap
来执行此操作,该过滤器可以过滤和展开数据,而无需从shuffle中读取数据。像这样:
def unwind_and_filter((key, values)):
# This consumes all the data from shuffle
value_list = list(values)
if len(value_list) > 50:
yield value_list
p | 'Group' >> beam.GroupByKey()
| 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter)
让我知道是否有帮助。