我正在考虑将文件加载到一维表中。我的解决方案是:


Beam.read文件
从数据库创建有关现有数据的侧面输入。
在ParDo中:过滤侧面输入中已经存在的记录
biquerySink进入数据库。


并想询问是否有人实施了此程序?你能给我一些例子吗?
谢谢

can you give me some example about coGroupByKey. I understand that it may look like below : Sorry,I am newbie to Dataflow,and watching codes is the best way to me

step 1: sourcedata = beam.ReadFromText(...)
step 2: existing_table = beam.pvalue.AsDict(p
                                    | beam.Read(beam.BigQuerySource(my_query)
                                    | beam.Map(format_rows)

I assume the structure of sourcedata and existing data is the same :<k,v>
step 3:  source_existing_Data=  {sourcedata,existing_table}
                                |'coGroupBy' >> beam.coGroupByKey()


step4:  new_Data = source_existing_Data | beam.filter(lamada (name,(existing,source)):source is NONE))

step 5:  bigQuerySink(new_Data)

最佳答案

侧面输入是一个不错的选择,但是请考虑一下,如果您的数据库表很大,您以后可能会发现CoGroupByKey是更好的选择。要在侧面输入中实现此功能,请执行以下操作:

p = beam.Pipeline(..)
existing_table = beam.pvalue.AsDict(p
                                    | beam.Read(beam.io.BigQuerySource(my_query)
                                    | beam.Map(format_rows))

class FilterRowsDoFn(beam.DoFn):
  def process(self, elem, table_dict):
    k = elem[0]
    if k not in table_dict:
      yield elem

result = (p
          | beam.ReadFromText(...)
          | beam.ParDo(FilterRowsDoFn(), table_dict=existing_table))


然后您可以将结果写入BQ。但是,同样,如果您的表已经包含许多元素,则可能需要考虑使用CoGroupByKey



使用CoGroupByKey完成此操作的代码应如下所示:

sourcedata = (p
              | beam.ReadFromText(...)
              | beam.Map(format_text))
existing_table = (p
                  | beam.Read(beam.io.BigQuerySource(my_query)
                  | beam.Map(format_rows))

source_existing_data = ((sourcedata, existing_table)
                        | 'coGroupBy' >> beam.coGroupByKey())

new_data = (source_existing_data
            | beam.Filter(lamada (name, (source, existing)): not list(source))
            | beam.FlatMap(lambda (name, (source, existing)): [(name, s) for s in source]))

result = new_data | bigQuerySink(new_Data)


如果您在使用任何一个代码片段时遇到任何麻烦,请告诉我,我们将对其进行修复。

07-24 21:46