我正在玩Celery,并且正在尝试使用CELERYBEAT_SCHEDULER进行定期任务。这是我的配置:

CELERY_TIMEZONE = 'Europe/Kiev'
CELERYBEAT_SCHEDULE = {
    'run-task-every-5-seconds': {
        'task': 'tasks.run_every_five_seconds',
        'schedule': timedelta(seconds=5),
        'options': {
            'expires': 10,
        }
    },
}

# the task
@app.task()
def run_every_five_seconds():
   return '5 seconds passed'

当使用celery -A celery_app beat运行节拍时,任务似乎没有过期。然后,我读到节拍可能有问题,因此它没有考虑到expires选项。

然后,我尝试执行一项任务,因此将其手动调用。
@app.task()
def print_hello():
    while True:
        print datetime.datetime.now()
        sleep(1)

我以这种方式调用任务:
print_hello.apply_async(args=[], expires=5)

工作人员的控制台告诉我我的任务将过期,但它也不会过期。它正在无限执行。
Received task: tasks.print_hello[05ee0175-cf3a-492b-9601-1450eaaf8ef7] expires:[2016-01-15 00:08:03.707062+02:00]

我做错什么了吗?

最佳答案

我认为您错误地理解了expires参数。

该文档说:“该任务将不会在过期时间后执行。” ref。这意味着如果超过了到期时间,则执行将不会开始。如果执行已经开始,则执行将完成。

您的配置每5秒将一个任务添加到任务队列。如果从将任务添加到任务队列起的10秒钟内执行仍未开始,则该任务将被丢弃。但是,由于有免费的 celery worker 可用,因此将立即执行任务。

您的代码示例添加了一个任务,如果在5秒钟内未开始执行,则该任务将被丢弃。

要获得所需的功能,可以将'expires': 10,替换为'expires': datetime.datetime.now() + timedelta(seconds=10),。这会将expires设置为绝对时间。

关于python - celery 的过期选项不起作用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/34800935/

10-13 07:22