我有一个特定的测试用例,在其中利用了celery后端,因此我需要task_always_eager设置为false,否则会得到RuntimeError。但是问题是,我需要所有其他测试用例都使用true进行设置,因此我仅针对此测试用例使用celery标记注释将celery config task_always_eager设置为false。但是,它似乎没有做任何事情。

这是我任务的框架:

@pytest.mark.celery(task_always_eager=False)
def test_task(self):
   # do some stuff to start a task
   do_stuff()
   # do some stuff to get info on a task
   get_info()
   # assertions

错误:
    def _ensure_not_eager(self):
        if self.app.conf.task_always_eager:
            raise RuntimeError(
               "Cannot retrieve result with task_always_eager enabled")
E          RuntimeError: Cannot retrieve result with task_always_eager enabled

/usr/local/lib/python3.6/site-packages/celery/backends/base.py:334: RuntimeError

TLDR:在此测试用例中,将task_always_eager设置为false会导致我做错什么,因为测试运行时它不会更改它吗?

最佳答案

Celery的配置对象是动态可更新的,因此您可以使用以下命令对其进行更新:
celery.conf.task_always_eager = False
celery.conf.CELERY_ALWAYS_EAGER = False,如果您使用的是pre- 4.0 Celery

您可以即时,逐项测试或在“setUp”操作过程中执行此操作。

关于python - 将 celery 设置task_always_eager更改为单个单元测试用例,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/51126547/

10-16 03:11