从芹菜任务接收事件

从芹菜任务接收事件

本文介绍了从芹菜任务接收事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



任务应以某种方式报告当前正在处理的项目,因此我有一个长时间运行的芹菜任务,它会遍历一系列项目并执行一些操作。最终用户知道任务的进度。



目前,我的django应用程序和芹菜座位在一个服务器上,所以我可以使用Django的模型来报告状态,但是我计划添加更多的离Django的工作人员,所以他们无法访问DB。



现在我看到几个解决方案:




  • 使用一些存储(如redis或mongodb)手动存储中间结果,然后通过网络可用。这让我有点担心,因为如果我会使用redis,那么我应该保持同步Django方面的代码,读取状态和Celery任务编写状态,所以他们使用相同的键。

  • 使用REST调用从芹菜回报Django的状态。像 PUT http://django.com/api/task/123/items_processed

  • 也许使用Celery事件系统并创建事件项目处理其中django更新计数器

  • 创建一个独立的工作者,在服务器上运行,django拥有只增加的任务项目继续进行,所以当任务完成时,它会发出 increase_messages_proceeded_count.delay(task_id)



我提到的是否有任何解决方案或隐藏的问题?

解决方案

可能有很多方法来实现您的目标,但这里是如何做到的。



任务使用设置进度:

 从django.core.cache导入缓存

@ app.task()
def long_running_task(self,* args ,** kwargs):
key =my_task:%s%self.result.id
...
#做任何你需要做的,并设置进度
#使用缓存:
cache.set(key,progress,timeout =whatever whatever for you)
...

然后,您需要做的是使用该键重复出现AJAX GET请求,并从缓存中检索进度。沿着这些方向的东西:

  def task_progress_view(request,* args,** kwargs):
key = request。 GET.get('task_key')
progress = cache.get(key)
return HttpResponse(content = json.dumps({'progress':progress}),
content_type =application / json; charset = utf-8)

虽然这是一个警告,服务器作为多个进程,确保您正在使用像memcached这样的东西,因为django的本机缓存在进程之间将不一致。另外我也许不会使用芹菜的 task_id 作为一个关键,但它只能用于演示目的。


I have a long running celery task which iterates over an array of items and performs some actions.

The task should somehow report back which item is it currently processing so end-user is aware of the task's progress.

At the moment my django app and celery seat together on one server, so I am able to use Django's models to report the status, but I am planning to add more workers which are away from Django, so they can't reach DB.

Right now I see few solutions:

  • Store intermediate results manually using some storage, like redis or mongodb making then available over the network. This worries me a little bit because if for example I will use redis then I should keep in sync the code on a Django side reading the status and Celery task writing the status, so they use the same keys.
  • Report status to the Django back from celery using REST calls. Like PUT http://django.com/api/task/123/items_processed
  • Maybe use Celery event system and create events like Item processed on which django updates the counter
  • Create a seperate worker which runs on a server with django which holds a task which only increases items proceeded count, so when the task is done with an item it issues increase_messages_proceeded_count.delay(task_id).

Are there any solution or hidden problems with the ones I mentioned?

解决方案

There are probably many ways to achieve your goal, but here is how I would do it.

Inside your long running celery task set the progress using django's caching framework:

from django.core.cache import cache

@app.task()
def long_running_task(self, *args, **kwargs):
    key = "my_task: %s" % self.result.id
    ...
    # do whatever you need to do and set the progress
    # using cache:
    cache.set(key, progress, timeout="whatever works for you")
    ...

Then all you have to do is make a recurring AJAX GET request with that key and retrieve the progress from cache. Something along those lines:

 def task_progress_view(request, *args, **kwargs):
     key = request.GET.get('task_key')
     progress = cache.get(key)
     return HttpResponse(content=json.dumps({'progress': progress}),
                         content_type="application/json; charset=utf-8")

Here is a caveat though, if you are running your server as multiple processes, make sure that you are using something like memcached, because django's native caching will be inconsistent among the processes. Also I probably wouldn't use celery's task_id as a key, but it is sufficient for demonstration purpose.

这篇关于从芹菜任务接收事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-07 09:32