我正在创建一个Airflow @daily DAG,它有一个BigQueryGetDataOperator上游任务get_daily_data
,它根据execution_date和依赖于下游的任务(PythonOperator)通过xcom_pull使用基于日期的数据来获取数据。当我运行airflow backfill命令(我正在执行xcom_pull的下游任务process_data_from_bq
)时,它仅获取最新数据,而不获取下游任务期望的同一执行日期的数据。
Airfow文档说如果我们通过如果将xcom_pull传递给task_ids的单个字符串,则将返回该任务的最新XCom值
但是,它并没有说如何获取DAG执行相同实例的数据。
我遇到了一个相同的问题How to pull xcom value from other task instance in the same DAG run (not the most recent one)?,但是给出的一个解决方案是我已经在做的事情。但似乎它不是正确的答案。
DAG定义:
dag = DAG(
'daily_motor',
default_args=default_args,
schedule_interval='@daily'
)
#This task creates data in a BigQuery table based on execution date
extract_daily_data = BigQueryOperator(
task_id='daily_data_extract',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
sql=policy_by_transaction_date_sql('{{ ds }}'),
destination_dataset_table='Test.daily_data_tmp',
dag=dag)
get_daily_data = BigQueryGetDataOperator(
task_id='get_daily_data',
dataset_id='Test',
table_id='daily_data_tmp',
max_results='10000',
dag=dag
)
#This is where I need to pull the data of the same execution date/same instance of DAG run not the most recent task run
def process_bq_data(**kwargs):
bq_data = kwargs['ti'].xcom_pull(task_ids = 'get_daily_data')
#This bq_data is most recent one not of the same execution date
obj_creator = IibListToObject()
items = obj_creator.create(bq_data, 'daily')
save_daily_date_wise(items)
process_data = PythonOperator(
task_id='process_data_from_bq',
python_callable=process_bq_data,
provide_context=True,
dag = dag
)
get_daily_data.set_upstream(extract_daily_data)
process_data.set_upstream(get_daily_data)
最佳答案
您必须获得最新的Xcom值。您还需要确保值实际上与假设的日期相同:
:param include_prior_dates:
如果为False,则仅来自当前的XCom
返回执行日期。
如果为True,则来自先前日期的XCom
也返回。
关于python - Airflow xcom_pull不提供相同上游任务实例运行的数据,而是提供最新数据,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56495616/