动态创建任务列表

动态创建任务列表

本文介绍了动态创建任务列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个DAG,该DAG是通过查询DynamoDB的列表而创建的,对于列表中的每个项目,都使用PythonOperator创建一个任务并将其添加到DAG中。在下面的示例中未显示,但是需要注意的是,列表中的某些项依赖于其他任务,因此我使用 set_upstream 来强制执行依赖性。

I have a DAG which is created by querying DynamoDB for a list and for each item in the list a task is created using a PythonOperator and adding it to the DAG. Not show in the example below but it's important to note that some of the items on the list depend upon other tasks so I'm using set_upstream to enforce the dependencies.

- airflow_home
  \- dags
    \- workflow.py

workflow.py

def get_task_list():
    # ... query dynamodb ...

def run_task(task):
    # ... do stuff ...

dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
    t = PythonOperator(
        task_id=task['id'],
        provide_context=False,
        dag=dag,
        python_callable=run_task,
        op_args=[task]
    )

问题是 workflow.py 正在运行并且结束(每次任务运行?),而我的 get_task_list()方法正受到AWS限制并引发异常。

The problem is workflow.py is getting run over and over (every time a task runs?) and my get_task_list() method is getting throttled by AWS and throwing exceptions.

我以为是因为每当调用 run_task()时,它都会在<$ c $中运行所有全局变量c> workflow.py ,所以我尝试将 run_task()移到这样的单独模块中:

I thought it was because whenever run_task() was called it was running all the globals in workflow.py so I've tried moving run_task() into a separate module like this:

- airflow_home
  \- dags
    \- workflow.py
    \- mypackage
      \- __init__
      \- task.py

但是它没有任何改变。我什至尝试将 get_task_list()放入用工厂函数包装的SubDagOperator中,其行为仍然相同。

But it didn't change anything. I've even tried putting get_task_list() into a SubDagOperator wrapped with a factory function, which still behaves the same way.

我的问题是否与这些问题有关?

Is my problem related to these issues?




  • Tasks added to DAG during runtime fail to be scheduled
  • How to nest an Airflow DAG dynamically?

此外,为什么 workflow.py 如此频繁地运行,为什么<$会引发错误c $ c> get_task_list()导致单个任务在任务方法未引用 workflow.py 并且没有依赖关系时失败?

Also, why is workflow.py getting run so often and why would an error thrown by get_task_list() cause the individual task to fail when the task method doesn't reference workflow.py and has no dependencies on it?

最重要的是,并行处理列表并强制执行lis中项目之间的任何依赖关系的最佳方法是什么?

Most importantly, what would be the best way to both process the list in parallel and enforce any dependencies between items in the list?

推荐答案

根据您引用的问题,在dag运行时气流不支持任务创建。

As per the questions you referenced, airflow doesn't support task creation while dag is running.

因此,发生的事情是气流在开始运行之前会定期生成完整的DAG定义。理想情况下,此类生成的时间段应与该DAG的计划间隔相同。

Therefore what happens is that airflow will periodically generate the complete DAG definition before it starts a run. Ideally, the period of such generation should be the same as schedule interval for that DAG.

BUT 可能是每次气流检查更改dag时,它还会生成完整的dag,从而导致请求过多。该时间是通过airflow.cfg中的min_file_process_interval和dag_dir_list_interval配置来控制的。

BUT it might be that every time airflow checks for changes in dag, it is also generating the complete dag, causing too many requests. That time is controlled using the configurations min_file_process_interval and dag_dir_list_interval in airflow.cfg.

关于任务的失败,它们失败是因为dag创建本身失败并且无法进行气流启动它们。

Regarding the failure of tasks, they fail because the dag creation itself failed and airflow wasn't able to start them.

这篇关于动态创建任务列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 06:03