I have list that I loop to create the tasks. The list are static as far as size. for counter, account_id in enumerate(ACCOUNT_LIST): task_id = f"bash_task_{counter}" if account_id: trigger_task = BashOperator( task_id=task_id, bash_command="echo hello there", dag=dag) else: trigger_task = BashOperator( task_id=task_id, bash_command="echo hello there", dag=dag) trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator? trigger_taskI tried this manually but cannot make the task skipped: start = DummyOperator(task_id='start') task1 = DummyOperator(task_id='task_1') task2 = DummyOperator(task_id='task_2') task3 = DummyOperator(task_id='task_3') task4 = DummyOperator(task_id='task_4') start >> task1 start >> task2 try: start >> task3 raise AirflowSkipException except AirflowSkipException as ase: log.error('Task Skipped for task3') try: start >> task4 raise AirflowSkipException except AirflowSkipException as ase: log.error('Task Skipped for task4') 解决方案 yes there you need to raise AirflowSkipExceptionfrom airflow.exceptions import AirflowSkipExceptionraise AirflowSkipExceptionFor more information see the source code 这篇关于Airflow 将任务实例状态设置为以编程方式跳过的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
07-04 06:25