我试图基于dagrun输入变量设置S3KeySensor的bucket_key。
我有一个dag“ dag_trigger”,它使用TriggerDagRunOperator来为dag“ dag_triggered”触发dagrun。我正在尝试扩展示例https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py

因此,我想将变量发送给触发的dag,然后根据变量的值在S3KeySensor任务中设置backet_key值。我知道如何在PythonOperator可调用函数中使用发送的变量,但是我不知道如何在传感器对象上使用它。

dag_trigger dag:

import datetime

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator


default_args = {
    'owner': 'airflow',
    'start_date': datetime.datetime.now()}

dag = DAG('dag_trigger', default_args=default_args, schedule_interval="@hourly")

def task_1_run(context, dag_run_object):
    sent_variable = '2018_02_19' # not important
    dag_run_object.payload = {'message': sent_variable}
    print "DAG dag_trigger triggered with payload: %s" % dag_run_object.payload)
    return dag_run_object

task_1 = TriggerDagRunOperator(task_id="task_1",
                               trigger_dag_id="dag_triggered",
                               provide_context=True,
                               python_callable=task_1_run,
                               dag=dag)


和dag_triggered dag:

import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import S3KeySensor


default_args = {
    'owner': 'airflow',
    'start_date': datetime.datetime.now()
}

dag = DAG('dag_triggered', default_args=default_args, schedule_interval=None)

wait_files_to_arrive_task = S3KeySensor(
    task_id='wait_file_to_arrive',
    bucket_key='file_%s' % '', # Here I want to place conf['sent_variable']
    wildcard_match=True,
    bucket_name='test-bucket',
    s3_conn_id='test_s3_conn',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)


我试图使用dag.get_dagrun()。conf ['sent_variable']从dag对象获取值,但是我不确定如何设置dagrun create_date变量(dag_trigger每小时都会触发dag_triggered,而dag_triggered可能需要等待更长的时间)。

我还尝试创建PythonOperator,它将作为wait_files_to_arrive_task的上游。可调用的python函数可以获取有关send_variable的信息。之后,我尝试为bucket_key设置值,例如bucket_key = callable_function()-但参数有问题。

而且我也认为全局变量不是一个好的解决方案。

也许有人有想法可行。

最佳答案

直接在DAG文件中无法在DAG run conf中获取值。如果没有运行DAG的上下文,这是无法确定的。一种考虑方法是运行python my_dag.py以测试DAG文件是否编译时,它必须初始化所有这些运算符而无需指定执行日期。因此,不能因DAG运行而有所不同的任何内容。

因此,您可以将其作为模板值传递,稍后将在实际运行任务时使用上下文呈现该值。

wait_files_to_arrive_task = S3KeySensor(
    task_id='wait_file_to_arrive',
    bucket_key='file_{{ dag_run.conf["message"] }}',
    ...)


请注意,将仅呈现运算符template_fields中列出的参数。幸运的是,有人预料到了这一点,所以bucket_key确实是模板字段。

关于python - 如何在 Airflow 的S3KeySensor中动态添加bucket_key值,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48691203/

10-09 04:30