没有有关如何将pCollections转换为输入到.CoGroupByKey()所需的pCollections的文档
语境
本质上,我有两个大的pCollections,并且我需要能够找到两者之间的差异,以进行II型ETL更改(如果pColl1中不存在它,则添加到pColl2中的嵌套字段中),以便能够保留BigQuery中这些记录的历史记录。
管道架构:
将BQ表读入2个pCollection中:dwsku和product。
将CoGroupByKey()应用于这两个集合以返回->结果
解析结果以查找并将dwsku中的所有更改嵌套到产品中。
任何帮助将被推荐。我在SO上找到了一个Java链接,该链接执行我需要完成的相同操作(但Python SDK上没有任何内容)。
Convert from PCollection<TableRow> to PCollection<KV<K,V>>
是否有Apache Beam(尤其是Python SDK)的文档/支持?
最佳答案
为了使CoGroupByKey()
工作,您需要具有PCollections
的tuples
,其中第一个元素是键,第二个元素是数据。
在您的情况下,您说您拥有BigQuerySource
,在当前版本的Apache Beam中,该输出为PCollection of dictionaries
(code),其中每个条目代表表中已读取的一行。如上所述,您需要将此PCollections映射到元组。使用ParDo
很容易做到:
class MapBigQueryRow(beam.DoFn):
def process(self, element, key_column):
key = element.get(key_column)
yield key, element
data1 = (p
| "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
| "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))
data2 = (p
| "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
| "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))
co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey())
# do your processing with co_grouped here
顺便说一句,可以在here中找到Apache Beam的Python SDK的文档。