我正在尝试使用APScheduler测试错误执行的任务,但是重新启动APScheduler时看不到丢失的任务在运行。我已将APScheduler配置如下:
scheduler.py
def configure_scheduler():
jobstores = {
'default': SQLAlchemyJobStore(url=config('DATABASE_URL'))
}
sched = BlockingScheduler()
sched.configure(jobstores=jobstores)
sched.add_job(
test_task,
id='test_task',
'interval',
hours=1,
coalesce=True,
max_instances=1,
misfire_grace_time=360,
replace_existing=True
)
return sched
if __name__ == '__main__':
scheduler = configure_scheduler()
scheduler.start()
第一次启动调度程序时,
test_task
被添加到Postgres数据库中的apscheduler_jobs
表中,从我启动调度程序起的一小时内next_run_time
为一个小时。然后,我尝试通过以下方法测试失火:将数据库中的
next_run_time
更改为当前时间等待15秒
启动调度程序
当我按照此过程操作时,
next_run_time
再次设置为距当前时间的一小时。 next_run_time
似乎已在SQLAlchemy作业存储的update_job
方法中更新。我已经看到一个与持久性作业存储任务相关的similar question在断火后没有运行。我见过的大多数other questions的解决方案是将misfire_grace_time
参数添加到add_job
。我已经按照上面的配置尝试过此操作,但是在调度程序启动时没有运气错过任何作业。我是否缺少与replace_existing
和misfire_grace_time
参数交互方式有关的内容?我是否需要手动检查所有作业的next_run_time
是否过去,然后在启动调度程序之前运行这些作业?我正在使用APScheduler库的v3.6.1。
对于其他情况,我将在Heroku上部署调度程序,并且尝试解决每天至少发生一次的Heroku的automatic dyno cycling。
最佳答案
与AlexGrönholm(APScheduler的创建者)在APScheduler Gitter聊天室中的some discussion之后,我能够确定该作业在数据库中被“覆盖”了,因为我对replace_existing=True
的呼叫中包含add_job
。这将导致调度程序每次启动时都替换作业存储中的作业。
我的解决方法
在调度程序启动之前,请检查apscheduler_jobs
表中是否存在现有作业。
对于数据库中的每个作业,请对照当前时间检查next_run_time
。如果next_run_time
是过去的,请立即运行作业。
使用replace_existing=True
像以前一样计划作业。
启动调度程序。
关于python - APScheduler失火测试,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58583955/