我正在 Dataflow (Apache beam) 上创建一个管道来读取和写入 Google BigQuery 上的数据,但是我在创建 DAG 时遇到了问题,就像使用 Airflow 一样。

这是我的代码中的一个示例:

# define pipeline
p = beam.Pipeline(argv=pipeline_args)
# execute query_1
query_result_gps = ( p | 'ReadFromBQ GPS_data' >> ... )
# write result from query_1 on BigQuery
output_gps = ( query_result_gps | 'WriteToBQ GPS_data' >> ... )
# execute query_2
query_result_temperature = (output_gps
                                    | 'ReadFromBQ temperature_data' >> ... )
# write result from query_2
ouput_temperature = ( query_result_temperature | 'WriteToBQ temperature_data' >> ... )

我希望顺序执行这些任务,而不是 Dataflow 并行执行它们

google-cloud-dataflow - 创建 DAG 数据流 (apache Beam)-LMLPHP

我如何让它们按顺序执行?

最佳答案

由于您希望将中间步骤输出到 BigQuery 并在两个转换之间传输数据,我认为 Branch 将实现您想要的结果。

PCollection_1 = (从 BQ 读取).apply(Transform_1)

PCollection_1 .apply(写入 BQ)

PCollection_1 .apply(Transform_2).apply(写入BQ)

这将允许您在元素经过 Transform_1 并将该中间步骤写入 BQ 后在元素上应用 Transform_2。通过对同一个 PCollection 应用多个 ParDo,您可以在 DAG 中生成不同的分支。

关于google-cloud-dataflow - 创建 DAG 数据流 (apache Beam),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55869565/

10-11 22:14
查看更多