使用RabbitMQ更新Celery中的任务

使用RabbitMQ更新Celery中的任务

本文介绍了使用RabbitMQ更新Celery中的任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在django项目中使用Celery创建任务以在将来的特定时间发送电子邮件.用户可以使用 notify_on datetime字段创建一个Notification实例.然后,我将 notify_on 的值作为 eta 传递.

I'm using Celery in my django project to create tasks to send email at a specific time in the future. User can create a Notification instance with notify_on datetime field. Then I pass value of notify_on as a eta.

class Notification(models.Model):
    ...
    notify_on = models.DateTimeField()


def notification_post_save(instance, *args, **kwargs):
    send_notification.apply_async((instance,), eta=instance.notify_on)

signals.post_save.connect(notification_post_save, sender=Notification)

该方法的问题在于,如果用户更改 notify_on ,他将收到两个(或更多)通知,而不是一个.

The problem with that approach is that if notify_on will be changed by the user, he will get two(or more) notifications instead of one.

问题是如何更新与特定通知相关的任务,或者以某种方式删除旧通知并创建新通知.

The question is how do I update the task associated with a specific notification, or somehow delete the old one and create new.

推荐答案

首先,通过使用 post_save ,我们无法获取旧数据.因此,这里我覆盖了 Notification 模型的 save() 方法.除此之外,创建一个字段来存储芹菜task_id.

First of all, by using post_save, we can't fetch the old data. So, here I'm overriding the save() method of the Notification model. Apart from that,create a field to store the celery task_id.

from celery.task.control import revoke


class Notification(models.Model):
    ...
    notify_on = models.DateTimeField()
    celery_task_id = models.CharField(max_length=100)

    def save(self, *args, **kwargs):
        pre_notify_on = Notification.objects.get(pk=self.pk).notify_on
        super().save(*args, **kwargs)
        post_notify_on = self.notify_on
        if not self.celery_task_id:  # initial task creation
            task_object = send_notification.apply_async((self,), eta=self.notify_on)
            Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)
        elif pre_notify_on != post_notify_on:
            # revoke the old task
            revoke(self.celery_task_id, terminate=True)
            task_object = send_notification.apply_async((self,), eta=self.notify_on)
            Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)

参考

  1. 用Celery取消已经执行的任务?
  2. Django:如何访问原始(未修改的实例)保存在post_save信号中

这篇关于使用RabbitMQ更新Celery中的任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-25 11:04