我想重写Celery的Task类。我可以重写on_success和on_failure方法,但是run方法对我来说并不那么容易,我尝试使用bind方法。我的代码如下:

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print("success")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("failed")

    def bind(self, app):
        return super(self.__class__, self).bind(app)

    def run(self, *args, **kwargs):
        x = kwargs.get('data', None)
        x = x**2


if __name__=="__main__":
     mytask = MyTask()
     app = Celery('mytask', backend='redis', broker='redis://localhost')
     mytask.bind(app)
     job = mytask.apply_async(data = 1)


但是当我运行代码时,出现以下错误:

Received unregistered task of type None.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
  File "/home/ayandeh/anaconda3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: None


我搜索了很多,但没有结果。我应该如何注册任务?

最佳答案

显式绑定应用程序不是必需的,因为当调用apply_async时,celery任务将自动绑定到current_app。
如果要显式绑定,可以使用两种方法:
1. task.bind(app)
2.从app.Task创建Task类。通过继承app.Task,将绑定此应用程序。

您的问题与绑定无关。关于任务注册表。如下更正:

from celery import Celery, Task

class MyTask(Task):
    name = 'mytask'

    def on_success(self, retval, task_id, args, kwargs):
        print("success")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("failed")

    def run(self, *args, **kwargs):
        x = kwargs.get('data', None)
        print(x**2)


if __name__ == "__main__":
    app = Celery('mytask', backend='redis', broker='redis://localhost')
    MyTask.bind(app)
    app.tasks.register(MyTask)
    app.worker_main()


希望对您有所帮助。

关于python - 如何将 celery 应用程序与Task类绑定(bind)?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43960320/

10-12 18:27
查看更多