我正在考虑将文件加载到一维表中。我的解决方案是:
Beam.read文件
从数据库创建有关现有数据的侧面输入。
在ParDo中:过滤侧面输入中已经存在的记录
biquerySink进入数据库。
并想询问是否有人实施了此程序?你能给我一些例子吗?
谢谢
can you give me some example about coGroupByKey. I understand that it may look like below : Sorry,I am newbie to Dataflow,and watching codes is the best way to me
step 1: sourcedata = beam.ReadFromText(...)
step 2: existing_table = beam.pvalue.AsDict(p
| beam.Read(beam.BigQuerySource(my_query)
| beam.Map(format_rows)
I assume the structure of sourcedata and existing data is the same :<k,v>
step 3: source_existing_Data= {sourcedata,existing_table}
|'coGroupBy' >> beam.coGroupByKey()
step4: new_Data = source_existing_Data | beam.filter(lamada (name,(existing,source)):source is NONE))
step 5: bigQuerySink(new_Data)
最佳答案
侧面输入是一个不错的选择,但是请考虑一下,如果您的数据库表很大,您以后可能会发现CoGroupByKey
是更好的选择。要在侧面输入中实现此功能,请执行以下操作:
p = beam.Pipeline(..)
existing_table = beam.pvalue.AsDict(p
| beam.Read(beam.io.BigQuerySource(my_query)
| beam.Map(format_rows))
class FilterRowsDoFn(beam.DoFn):
def process(self, elem, table_dict):
k = elem[0]
if k not in table_dict:
yield elem
result = (p
| beam.ReadFromText(...)
| beam.ParDo(FilterRowsDoFn(), table_dict=existing_table))
然后您可以将结果写入BQ。但是,同样,如果您的表已经包含许多元素,则可能需要考虑使用
CoGroupByKey
。使用CoGroupByKey完成此操作的代码应如下所示:
sourcedata = (p
| beam.ReadFromText(...)
| beam.Map(format_text))
existing_table = (p
| beam.Read(beam.io.BigQuerySource(my_query)
| beam.Map(format_rows))
source_existing_data = ((sourcedata, existing_table)
| 'coGroupBy' >> beam.coGroupByKey())
new_data = (source_existing_data
| beam.Filter(lamada (name, (source, existing)): not list(source))
| beam.FlatMap(lambda (name, (source, existing)): [(name, s) for s in source]))
result = new_data | bigQuerySink(new_Data)
如果您在使用任何一个代码片段时遇到任何麻烦,请告诉我,我们将对其进行修复。