代码: Python 版本 2.7.x 和 Airflow 版本 1.5.1 我的 dag 脚本是这样的from airflow import DAGfrom airflow.operators import BashOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'xyz','depends_on_past': False,'start_date': datetime(2015,10,13),'email': ['[email protected]'],'schedule_interval':timedelta(minutes=5),'email_on_failure': True,'email_on_retry': True,'retries': 1,'retry_delay': timedelta(minutes=5),}dag = DAG('testing', default_args=default_args)run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first)从中你可以看到我正在创建一个有 6 个任务的 DAG,第一个任务(Start1)首先开始,然后所有其他五个任务开始目前我在 DAG 的启动之间给出了 5 分钟的时间延迟对于第一种类型的所有六个任务,它已经完美运行,但五分钟后 DAG 没有重新启动已经超过 1 小时了,DAG 仍未重新启动,我真的不知道我错了。如果有人能指出我出了什么问题,那就太好了。我尝试使用 airflow testing clear 进行清除,然后发生了同样的事情。它首先运行,然后就站在那里。命令行显示的唯一内容是 Getting all instance for DAG testing当我更改 schedule_interval 的位置时,它只会在没有任何计划时间间隔的情况下并行运行。也就是说,在 5 分钟内完成了 300 个或更多任务实例。没有 5 分钟的计划间隔 代码 2:from airflow import DAGfrom airflow.operators import BashOperatorfrom datetime import datetime, timedeltadefault_args = {'owner': 'xyz','depends_on_past': False,'start_date': datetime(2015,10,13),'email': ['[email protected]'],'email_on_failure': True,'email_on_retry': True,'retries': 1,'retry_delay': timedelta(minutes=5),}dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule hererun_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first) 最佳答案 对于代码 2,我猜它每分钟运行的原因是: 开始时间为2015-10-13 00:00 调度间隔为5分钟 调度程序的每一次心跳(默认为 5 秒),您的 DAG 将被检查 第一次检查:开始日期(未找到最后执行日期)+调度程序间隔 执行时间将被记录。 (例如 2015-10-13 00:00 + 5min 下一次心跳的第二次检查:上次执行时间+调度程序间隔 .... 解决方案是将 DAG start_date 设置为 datetime.now() - schedule_interval 。而且如果你想调试: 在 settings.py 中将 LOGGINGLEVEL 设置为 debug 修改类方法 is_queueable() 的 airflow.models.TaskInstance 为 :def is_queueable(self, flag_upstream_failed=False): logging.debug('Checking whether task instance is queueable or not!') if self.execution_date > datetime.now() - self.task.schedule_interval: logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now())) return False ...关于python - Airflow 未正确调度 Python,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33126159/
10-16 17:10