我已经用多个PythonOperators编写了DAG
task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
provide_context=True,
python_callable=Task1, dag=dag1)
def Task1(**kwargs):
return(kwargs['dag_run'].conf.get('file'))
我从PythonOperator调用“ Task1”方法。该方法正在返回一个值,该值需要传递给下一个PythonOperator。如何从“ task1”变量中获取该值,或者如何从Task1方法中返回该值?
更新 :
def Task1(**kwargs):
file_name = kwargs['dag_run'].conf.get[file]
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='file', value=file_name)
return file_name
t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)
t2 = BashOperator(
task_id='Moving_bucket',
bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
dag=dag,
)
t2.set_upstream(t1)
最佳答案
您可能想查看Airflow的XCOM:https://airflow.apache.org/concepts.html#xcoms
如果从函数返回值,则此值存储在xcom中。就您而言,您可以像从其他Python代码一样访问它:
task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')
或像这样的模板中:
{{ task_instance.xcom_pull(task_ids='Task1') }}
如果要指定键,可以将其推入XCOM(在任务内):
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)
然后,您可以像下面这样访问它:
task_instance.xcom_pull(task_ids='my_task', key='the_key')
编辑1
后续问题:我不能在另一个函数中使用该值,而是将其传递给另一个PythonOperator,例如-“ t2 =” BashOperator(task_id ='Moving_bucket',bash_command ='python /home/raw.py“%s” '%file_name,dag = dag)“-我想访问“ Task1”返回的file_name。如何实现?
首先,在我看来,实际上该值不是传递给另一个
PythonOperator
而是传递给BashOperator
。其次,这已经在我上面的回答中涵盖了。字段
bash_command
是模板化的(请参见源中的template_fields
:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py)。因此,我们可以使用模板版本:BashOperator(
task_id='Moving_bucket',
bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
dag=dag,
)
编辑2
说明:
Airflow的工作方式如下:它将执行Task1,然后填充xcom,然后执行下一个任务。因此,要使您的示例正常工作,您需要先执行Task1,然后在Task1的下游执行Moving_bucket。
由于使用的是返回函数,因此您也可以从
key='file'
中省略xcom_pull
而不在函数中手动进行设置。关于python - Python Airflow-从PythonOperator返回结果,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50149085/