我有一个简单的任务,具有多个具有简单结构的任务,任务A,B和C可以在开始时运行而没有任何依赖关系,但是任务D依赖于A,这不是我的问题:

任务A,B和C每天运行,但是我需要任务D在A成功之后每周运行。如何设置此dag?

更改任务的schedule_interval是否有效?有没有针对此问题的最佳实践?

谢谢你的帮助。

最佳答案

您可以使用ShortCircuitOperator执行此操作。

import airflow
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'schedule_interval': '0 10 * * *'
}

dag = DAG(dag_id='example', default_args=args)

a = DummyOperator(task_id='a', dag=dag)
b = DummyOperator(task_id='b', dag=dag)
c = DummyOperator(task_id='c', dag=dag)
d = DummyOperator(task_id='d', dag=dag)

def check_trigger(execution_date, **kwargs):
    return execution_date.weekday() == 0

check_trigger_d = ShortCircuitOperator(
  task_id='check_trigger_d',
  python_callable=check_trigger,
  provide_context=True,
  dag=dag
)

a.set_downstream(b)
b.set_downstream(c)
a.set_downstream(check_trigger_d)
# Perform D only if trigger function returns a true value
check_trigger_d.set_downstream(d)

关于python-3.x - 如何在单个Dag Airflow 中处理不同的任务间隔?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48440262/

10-14 19:38
查看更多