我正在 Dataflow (Apache beam) 上创建一个管道来读取和写入 Google BigQuery 上的数据,但是我在创建 DAG 时遇到了问题,就像使用 Airflow 一样。
这是我的代码中的一个示例:
# define pipeline
p = beam.Pipeline(argv=pipeline_args)
# execute query_1
query_result_gps = ( p | 'ReadFromBQ GPS_data' >> ... )
# write result from query_1 on BigQuery
output_gps = ( query_result_gps | 'WriteToBQ GPS_data' >> ... )
# execute query_2
query_result_temperature = (output_gps
| 'ReadFromBQ temperature_data' >> ... )
# write result from query_2
ouput_temperature = ( query_result_temperature | 'WriteToBQ temperature_data' >> ... )
我希望顺序执行这些任务,而不是 Dataflow 并行执行它们
我如何让它们按顺序执行?
最佳答案
由于您希望将中间步骤输出到 BigQuery 并在两个转换之间传输数据,我认为 Branch 将实现您想要的结果。
PCollection_1 = (从 BQ 读取).apply(Transform_1)
PCollection_1 .apply(写入 BQ)
PCollection_1 .apply(Transform_2).apply(写入BQ)
这将允许您在元素经过 Transform_1 并将该中间步骤写入 BQ 后在元素上应用 Transform_2。通过对同一个 PCollection 应用多个 ParDo,您可以在 DAG 中生成不同的分支。
关于google-cloud-dataflow - 创建 DAG 数据流 (apache Beam),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55869565/