我正在尝试使用APScheduler测试错误执行的任务,但是重新启动APScheduler时看不到丢失的任务在运行。我已将APScheduler配置如下:

scheduler.py

def configure_scheduler():
    jobstores = {
        'default': SQLAlchemyJobStore(url=config('DATABASE_URL'))
    }
    sched = BlockingScheduler()
    sched.configure(jobstores=jobstores)
    sched.add_job(
        test_task,
        id='test_task',
        'interval',
        hours=1,
        coalesce=True,
        max_instances=1,
        misfire_grace_time=360,
        replace_existing=True
    )
    return sched

if __name__ == '__main__':
    scheduler = configure_scheduler()
    scheduler.start()



第一次启动调度程序时,test_task被添加到Postgres数据库中的apscheduler_jobs表中,从我启动调度程序起的一小时内next_run_time为一个小时。然后,我尝试通过以下方法测试失火:


将数据库中的next_run_time更改为当前时间
等待15秒
启动调度程序


当我按照此过程操作时,next_run_time再次设置为距当前时间的一小时。 next_run_time似乎已在SQLAlchemy作业存储的update_job方法中更新。我已经看到一个与持久性作业存储任务相关的similar question在断火后没有运行。我见过的大多数other questions的解决方案是将misfire_grace_time参数添加到add_job。我已经按照上面的配置尝试过此操作,但是在调度程序启动时没有运气错过任何作业。我是否缺少与replace_existingmisfire_grace_time参数交互方式有关的内容?我是否需要手动检查所有作业的next_run_time是否过去,然后在启动调度程序之前运行这些作业?

我正在使用APScheduler库的v3.6.1。

对于其他情况,我将在Heroku上部署调度程序,并且尝试解决每天至少发生一次的Heroku的automatic dyno cycling

最佳答案

与AlexGrönholm(APScheduler的创建者)在APScheduler Gitter聊天室中的some discussion之后,我能够确定该作业在数据库中被“覆盖”了,因为我对replace_existing=True的呼叫中包含add_job。这将导致调度程序每次启动时都替换作业存储中的作业。

我的解决方法


在调度程序启动之前,请检查apscheduler_jobs表中是否存在现有作业。
对于数据库中的每个作业,请对照当前时间检查next_run_time。如果next_run_time是过去的,请立即运行作业。
使用replace_existing=True像以前一样计划作业。
启动调度程序。

关于python - APScheduler失火测试,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58583955/

10-12 13:00