问题描述
我目前正在尝试使用Airflow来协调一个流程,在该流程中动态定义了一些运算符,并依赖于另一个(较早的)运算符的输出。
I’m currently trying to use Airflow to orchestrate a process where some operators are defined dynamically and depend on the output of another (earlier) operator.
在t1下面的代码中,文本文件更新了新记录(实际上是从外部队列读取的,但为简单起见,我将它们硬编码为A,B和C在这里)。然后,我想为从该文本文件读取的每个记录创建单独的运算符。这些操作员将分别创建目录A,B和C,并且在Airflow UI中将其视为单独的bash进程Create_directory_A,Create_directory_B和Create_directory_C。
In the code below t1 updates a text file with new records (these are actually read from an external queue but, for simplicity, I hard coded them as A, B and C here). Then, I want to create separate operators for each record read from that text file. These operators will create directories A, B and C, respectively, and in Airflow UI will be seen as separate bash processes Create_directory_A, Create_directory_B and Create_directory_C.
dag = DAG('Test_DAG',
description="Lorem ipsum.",
start_date=datetime(2017, 3, 20),
schedule_interval=None,
catchup=False)
def create_text_file(list_of_rows):
text_file = open('text_file.txt', "w")
for row in list_of_rows:
text_file.write(row + '\n')
text_file.close()
def read_text():
txt_file = open('text_file.txt', 'r')
return [element for element in txt_file.readlines()]
t1 = PythonOperator(
task_id='Create_text_file',
python_callable=create_text_file,
op_args=[['A', 'B', 'C']],
dag=dag
)
for row in read_text():
t2 = BashOperator(
task_id='Create_directory_{}'.format(row),
bash_command="mkdir {{params.dir_name}}",
params={'dir_name': row},
dag=dag
)
t1 >> t2
在我可以看到调度程序将定期执行它[DAG]以反映所做的更改。
推荐答案
您不能动态创建依赖于上游任务输出的任务。您混淆了时间表和执行时间。 DAG 定义和任务在计划时间创建。在执行时将创建DAG run 和任务实例。只有任务 instance 可以产生输出。
You cannot create tasks dynamically that depend on the output of an upstream task. You're mixing up schedule and execution time. A DAG definition and a task is created at schedule time. A DAG run and task instance is created at execution time. Only a task instance can produce output.
Airflow调度程序将使用 text_file.txt
包含在计划时间。然后将这些任务运送给工人。
The Airflow scheduler will build the dynamic graph with whatever text_file.txt
contains at schedule time. These tasks are then shipped off to the workers.
工人最终将执行 t1
任务实例并创建一个新的 text_file.txt
,但此时, t2
任务列表已经由调度程序计算并发送
A worker will eventually execute the t1
task instance and create a new text_file.txt
, but at this point, the list of t2
tasks has already been calculated by the scheduler and sent off to the workers.
因此,无论最新的 t1
任务实例如何转储到 text_file中.txt
将在调度程序下次决定运行DAG的时间使用。
So, whatever the latest t1
task instance dumps into text_file.txt
will be used the next time the scheduler decides it's time to run the DAG.
如果您的任务很快而您的工人却没有积压的订单,这将是先前DAG运行的内容。如果积压了这些内容,则 text_file.txt
的内容可能会过时,并且如果您真倒霉,则调度程序会在写入任务实例的同时读取该文件,并且您会从 read_text()
中获取不完整的数据。
If your task is fast and your workers are not backlogged, that will be the contents from the previous DAG run. If they are backlogged, text_file.txt
contents may be stale, and if you're really unlucky, the scheduler reads the file while a task instance is writing to it, and you'll get incomplete data from read_text()
.
这篇关于气流中的动态任务定义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!