本文介绍了重试丢失或失败的任务(Celery,Django和RabbitMQ)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以确定是否丢失任何任务并重试?

Is there a way to determine if any task is lost and retry it?

我认为丢失的原因可能是调度程序错误或工作线程崩溃。

I think that the reason for lost can be dispatcher bug or worker thread crash.

我打算重试它们,但是我不确定如何确定哪些任务需要退休?

I was planning to retry them but I'm not sure how to determine which tasks need to be retired?

如何自动执行此过程?我可以使用自己的自定义调度程序来创建新任务吗?

And how to make this process automatically? Can I use my own custom scheduler which will create new tasks?

编辑:我从文档中发现RabbitMQ从不释放任务,但是当工作线程崩溃时会发生什么呢?

I found from the documentation that RabbitMQ never loose tasks, but what happens when worker thread crash in the middle of task execution?

推荐答案

您需要设置的内容

CELERY_ACKS_LATE = True

CELERY_ACKS_LATE = True

最后一个确认表示任务消息将在执行任务后得到确认,
不仅在执行之前被确认,这是默认行为。
这样,如果工作程序崩溃,兔子MQ仍然会收到消息。

Late ack means that the task messages will be acknowledged after the task has been executed,not just before, which is the default behavior.In this way if the worker crashes rabbit MQ will still have the message.

很明显,同时发生了一次总崩溃(兔子+工作程序)除非您在任务开始和任务结束上实现登录,否则无法恢复任务。
就我个人而言,每次执行任务时,我都会在mongodb中写一行,而当任务完成时(独立于结果),在另一行中写一行,这样我就可以通过分析mongo日志来知道哪个任务被中断。

Obviously of a total crash (Rabbit + workers) at the same time there is no way of recovering the task, except if you implement a logging on task start and task end.Personally I write in a mongodb a line every time a task start and another one when the task finish (independently form the result), in this way I can know which task was interrupted by analyzing the mongo logs.

您可以通过覆盖方法 __ call __ after_return 轻松完成此操作

You can do it easily by overriding the methods __call__ and after_return of the celery base task class.

下面是我的一段代码,该代码使用taskLogger类作为上下文管理器(带有入口和出口点)。
taskLogger类仅在mongodb实例中编写包含任务信息的行。

Following you see a piece of my code that uses a taskLogger class as context manager (with entry and exit point).The taskLogger class simply writes a line containing the task info in a mongodb instance.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

I希望这可以帮助

这篇关于重试丢失或失败的任务(Celery,Django和RabbitMQ)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-03 08:55