本文介绍了如何在Airflow中动态创建子标记的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主要的dag,它检索文件并将该文件中的数据拆分为单独的csv文件。
我必须为这些csv文件中的每个文件完成另一组任务。例如(上传到GCS,插入到BigQuery)
如何根据文件数为每个文件动态生成SubDag? SubDag将定义任务,例如上传到GCS,插入到BigQuery,删除csv文件)

I have a main dag which retrieves a file and splits the data in this file to separate csv files. I have another set of tasks that must be done for each file of these csv files. eg (Uploading to GCS, Inserting to BigQuery)How can I generate a SubDag for each file dynamically based on the number of files? SubDag will define the tasks like Uploading to GCS, Inserting to BigQuery, deleting the csv file)

现在,这就是它的样子

main_dag = DAG(....)
download_operator = SFTPOperator(dag = main_dag, ...)  # downloads file
transform_operator = PythonOperator(dag = main_dag, ...) # Splits data and writes csv files

def subdag_factory(): # Will return a subdag with tasks for uploading to GCS, inserting to BigQuery.
    ...
    ...

我怎么称呼subdag_factory为在transform_operator中生成的每个文件?

How can I call the subdag_factory for each file generated in transform_operator?

推荐答案

我尝试如下动态创建 subdag s

I tried creating subdags dynamically as follows

# create and return and DAG
def create_subdag(dag_parent, dag_id_child_prefix, db_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix + db_name)
    default_args_copy = default_args.copy()

    # dag
    dag = DAG(dag_id=dag_id_child,
              default_args=default_args_copy,
              schedule_interval='@once')

    # operators
    tid_check = 'check2_db_' + db_name
    py_op_check = PythonOperator(task_id=tid_check, dag=dag,
                                 python_callable=check_sync_enabled,
                                 op_args=[db_name])

    tid_spark = 'spark2_submit_' + db_name
    py_op_spark = PythonOperator(task_id=tid_spark, dag=dag,
                                 python_callable=spark_submit,
                                 op_args=[db_name])

    py_op_check >> py_op_spark
    return dag

# wrap DAG into SubDagOperator
def create_subdag_operator(dag_parent, db_name):
    tid_subdag = 'subdag_' + db_name
    subdag = create_subdag(dag_parent, tid_prefix_subdag, db_name)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

# create SubDagOperator for each db in db_names
def create_all_subdag_operators(dag_parent, db_names):
    subdags = [create_subdag_operator(dag_parent, db_name) for db_name in db_names]
    # chain subdag-operators together
    airflow.utils.helpers.chain(*subdags)
    return subdags


# (top-level) DAG & operators
dag = DAG(dag_id=dag_id_parent,
          default_args=default_args,
          schedule_interval=None)

subdag_ops = create_subdag_operators(dag, db_names)

请注意,为其创建 subdag 的输入的列表,此处的 db_names 可以在 python 文件中静态声明,也可以从外部源中读取。

Note that the list of inputs for which subdags are created, here db_names, can either be declared statically in the python file or could be read from external source.

生成的 DAG 看起来像这样

The resulting DAG looks like this

潜入 SubDAG (s)

这篇关于如何在Airflow中动态创建子标记的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-17 05:07