本文介绍了Airflow-如何将xcom变量传递给Python函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要引用由 BashOperator 返回的变量。我可能做错了,请原谅我。在我的 task_archive_s3_file 中,我需要从 get_s3_file 获取文件名。该任务只是将 {{ti.xcom_pull(task_ids = submit_file_to_spark)}} 打印为字符串而不是值。

I need to reference a variable that's returned by a BashOperator. I may be doing this wrong so please forgive me. In my task_archive_s3_file, I need to get the filename from get_s3_file. The task simply prints {{ ti.xcom_pull(task_ids=submit_file_to_spark) }} as a string instead of the value.

如果我使用 bash_command ,则该值会正确打印。

If I use the bash_command, the value prints correctly.

get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
    python_callable=obj.func_archive_s3_file,
    params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file


推荐答案

{{ ti.xcom_pull(...)}} 只能在支持模板的参数内使用,否则将不会在执行前呈现它们。请参阅template_fields 和 template_ext 属性-airflow / blob / master / airflow / operators / python_operator.py#L49-L50 rel = noreferrer> PythonOperator 和。

Templates like {{ ti.xcom_pull(...) }} can only be used inside of parameters that support templates or they won't be rendered prior to execution. See the template_fields and template_ext attributes of the PythonOperator and BashOperator.

所以 templates_dict 是用来将模板传递给python运算符的方法:

So templates_dict is what you use to pass templates to your python operator:

def func_archive_s3_file(**context):
    archive(context['templates_dict']['s3_path_filename'])

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,  # must pass this because templates_dict gets passed via context
    templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })

但是,在获取XCom值的情况下,另一种选择是使用<$通过上下文向您提供了c $ c> TaskInstance 对象:

However in the case of fetching an XCom value, another alternative is just using the TaskInstance object made available to you via context:

def func_archive_s3_file(**context):
    archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,

这篇关于Airflow-如何将xcom变量传递给Python函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 16:36
查看更多