几个运算符允许提取数据,但我从未设法使用结果。

例如:
https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_get_data.py

该运算符的名称如下:

get_data = BigQueryGetDataOperator(
      task_id='get_data_from_bq',
      dataset_id='test_dataset',
      table_id='Transaction_partitions',
      max_results='100',
      selected_fields='DATE',
      bigquery_conn_id='airflow-service-account'
      )

但是,get_data的类型为DAG,但第116行显示“return table_data”。
需要明确的是,运算符(operator)可以工作并检索数据,我只是不了解如何使用数据检索/数据位于何处。

如何使用上面的“get_data”获取数据?

最佳答案

下一个任务中使用get_data的方式可以是PythonOperator,然后可以使用BigQueryGetDataOperator来处理数据。

get_data = BigQueryGetDataOperator(
      task_id='get_data_from_bq',
      dataset_id='test_dataset',
      table_id='Transaction_partitions',
      max_results='100',
      selected_fields='DATE',
      bigquery_conn_id='airflow-service-account'
      )

def process_data_from_bq(**kwargs):
      ti = kwargs['ti']
      bq_data = ti.xcom_pull(task_ids='get_data_from_bq')
      # Now bq_data here would have your data in Python list
      print(bq_data)

process_data = PythonOperator(
      task_id='process_data_from_bq',
      python_callable=process_bq_data,
      provide_context=True
      )

get_data >> process_data

PS:我是ojit_code和Airflow committer/PMC的作者

关于python - 将运算符的结果保存在Apache Airflow中,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53960327/

10-16 17:10