气流中的动态任务定义

气流中的动态任务定义

本文介绍了气流中的动态任务定义的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时删除!!

我目前正在尝试使用Airflow来协调一个流程,在该流程中动态定义了一些运算符,并依赖于另一个(较早的)运算符的输出。

I’m currently trying to use Airflow to orchestrate a process where some operators are defined dynamically and depend on the output of another (earlier) operator.

在t1下面的代码中,文本文件更新了新记录(实际上是从外部队列读取的,但为简单起见,我将它们硬编码为A,B和C在这里)。然后,我想为从该文本文件读取的每个记录创建单独的运算符。这些操作员将分别创建目录A,B和C,并且在Airflow UI中将其视为单独的bash进程Create_directory_A,Create_directory_B和Create_directory_C。

In the code below t1 updates a text file with new records (these are actually read from an external queue but, for simplicity, I hard coded them as A, B and C here). Then, I want to create separate operators for each record read from that text file. These operators will create directories A, B and C, respectively, and in Airflow UI will be seen as separate bash processes Create_directory_A, Create_directory_B and Create_directory_C.

dag = DAG('Test_DAG',
          description="Lorem ipsum.",
          start_date=datetime(2017, 3, 20),
          schedule_interval=None,
          catchup=False)


def create_text_file(list_of_rows):
    text_file = open('text_file.txt', "w")
    for row in list_of_rows:
        text_file.write(row + '\n')
    text_file.close()


def read_text():
    txt_file = open('text_file.txt', 'r')
    return [element for element in txt_file.readlines()]


t1 = PythonOperator(
    task_id='Create_text_file',
    python_callable=create_text_file,
    op_args=[['A', 'B', 'C']],
    dag=dag
)

for row in read_text():
    t2 = BashOperator(
        task_id='Create_directory_{}'.format(row),
        bash_command="mkdir {{params.dir_name}}",
        params={'dir_name': row},
        dag=dag
    )

    t1 >> t2

在我可以看到调度程序将定期执行它[DAG]以反映所做的更改

推荐答案

您不能动态创建依赖于上游任务输出的任务。您混淆了时间表和执行时间。 DAG 定义任务在计划时间创建。在执行时将创建DAG run 任务实例。只有任务 instance 可以产生输出。

You cannot create tasks dynamically that depend on the output of an upstream task. You're mixing up schedule and execution time. A DAG definition and a task is created at schedule time. A DAG run and task instance is created at execution time. Only a task instance can produce output.

Airflow调度程序将使用 text_file.txt 包含在计划时间。然后将这些任务运送给工人。

The Airflow scheduler will build the dynamic graph with whatever text_file.txt contains at schedule time. These tasks are then shipped off to the workers.

工人最终将执行 t1 任务实例并创建一个新的 text_file.txt ,但此时, t2 任务列表已经由调度程序计算并发送

A worker will eventually execute the t1 task instance and create a new text_file.txt, but at this point, the list of t2 tasks has already been calculated by the scheduler and sent off to the workers.

因此,无论最新的 t1 任务实例如何转储到 text_file中.txt 将在调度程序下次决定运行DAG的时间使用。

So, whatever the latest t1 task instance dumps into text_file.txt will be used the next time the scheduler decides it's time to run the DAG.

如果您的任务很快而您的工人却没有积压的订单,这将是先前DAG运行的内容。如果积压了这些内容,则 text_file.txt 的内容可能会过时,并且如果您真倒霉,则调度程序会在写入任务实例的同时读取该文件,并且您会从 read_text()中获取不完整的数据。

If your task is fast and your workers are not backlogged, that will be the contents from the previous DAG run. If they are backlogged, text_file.txt contents may be stale, and if you're really unlucky, the scheduler reads the file while a task instance is writing to it, and you'll get incomplete data from read_text().

这篇关于气流中的动态任务定义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

1403页,肝出来的..

09-06 21:54