我启动了一个任务,该任务会定期更新其状态并观察结果,但是在调用方的第二个周期之后,程序将引发BacklogLimitExceeded异常(任务本身会在一段时间后成功完成)

调用方:

      task = signature("worker.taskname", args=(url, ), queue="worker")
      g = group(task).apply_async()
      while not g.ready():
          print(g[0].result)
          time.sleep(5)

任务方:
 with open(filename, "wb") as w:
     fd = stream.open()
     while True:
         data = fd.read(2048)
         if data:
             w.write(data)
             size = w.tell()
             # taskname.update_state(meta={'size': size})
         else:
             break

(如果我评论那条线,一切工作正常)

我在Ubuntu 14.04上,也使用RabbitMQ作为代理和后端。任何想法如何解决这个问题?

这是确切的stracktrace
Traceback (most recent call last):
  File "main.py", line 55, in <module>
    while not g.ready():
  File "python3.4/site-packages/celery/result.py", line 503, in ready
    return all(result.ready() for result in self.results)
  File "python3.4/site-packages/celery/result.py", line 503, in <genexpr>
    return all(result.ready() for result in self.results)
  File "python3.4/site-packages/celery/result.py", line 259, in ready
    return self.state in self.backend.READY_STATES
  File "python3.4/site-packages/celery/result.py", line 394, in state
    return self._get_task_meta()['status']
  File "python3.4/site-packages/celery/result.py", line 339, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "python3.4/site-packages/celery/backends/amqp.py", line 180, in get_task_meta
    raise self.BacklogLimitExceeded(task_id)
celery.backends.amqp.BacklogLimitExceeded: 0a4fb653-0f05-48dc-ac43-fb0c8fbaba9a

最佳答案

最近,我以Redis作为后端收到了此错误,并对其进行了更多研究。该错误是由于后端有1000条以上的消息所致,当循环达到此默认限制时,您会收到此错误。

有一些旋钮可能会有所帮助,result_expires是其中之一。您还可以将限制增加到1000以上。

http://docs.celeryproject.org/en/latest/userguide/configuration.html#redis-backend-settings

关于python - celery 抛出BacklogLimitExceeded,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31635921/

10-09 17:13