在本篇文章中不会针对 celery 的某个部分话题进行深入介绍,而是记录一些学习和使用 celery 的过程中让我困惑的一些问题。
一、celery 做了什么
celery 官方文档介绍说 celery 是一个分布式任务队列,但是通过阅读其官方文档很难让开发者感受到是分布式的,因为官方并没有提供分布式的代码样例以及相关说明。
celery 是封装了消息队列,比如常用的 rabbitmq、kafka或者nysql、redis等等这种伪消息队列,对mysql、redis用类似 rabbitmq 和 kafak 中的概念进行了抽象,其实 celery 也没有自己去针对这些消息队列去进行抽象,而是大量的借助了 kombu 的代码, kombu 底层才是真正的将 消息队列给抽象了起来,那 celery 为什么又封装了一层?celery 其实是将消息队列中的生产者和消费者的使用方式进行了抽象,kombu 是将消息队列的使用方式进行了抽象。在 celery 的世界里可以忽略生产者和消费者概念,只考虑任务的概念,celery 帮助开发者将要执行的函数封装成了任务,这个任务一调用就会在本地机器执行或者在远程机器执行。这样开发者就可以专心的写业务逻辑了。
二、celery 中的任务是如何执行的
@app.task
def add(x, y):
return x+y
我们使用时都会像上面那段代码一样,将我们的函数用装饰器包装起来形成一个任务,这个任务当我们调用 add.delay(1,2) 时,其实调用了消息队列中的生产者,根据 celery 的配置文件,或者在装饰器中传递的参数会选择路由和队列从而将任务发送给消费者。消费者获取到任务就会将任务扔到进程池、线程池、协程中的某一个策略中去运行任务。
生产者发送的一定是任务的元数据信息,肯定不会发送一个对象的序列化格式过去,不是不可以,但是不建议那样做。元数据最重要的是任务的 id 以及 name。消费和会根据任务名称去寻找要执行的任务,消费者是如何根据任务名称找到将要执行的任务的,看下一小节。
三、celery 消费者如何根据任务名称找到要执行的任务
celery 中写好的任务文件不仅仅要在生产者(分发任务的机器) 上要存在,在消费者(启动 worker)d的机器上也要存在,这样任务才能注册到数据结构中,worker 收到任务的元数据才会找到要执行的任务,这一点官方文档没有样例,也没有明确的说明,所以很让人不解 celery是怎么个分布式的,要让 worker(消费者)在启动时能够将所有任务注册进来,有四种实现方式。
1)将 Celery 的实例与所有的 task 写在同一个文件中
mkdir /root/project
cd /root/project
touch __init__.py
touch celery.py
cat celery.py
from celery import Celery
app = Celery(include=["project.tasks"])
touch tasks.py
cat tasks.py
from project.celery import app
@app.task
def add(x,y):
reurn x+y
@app.task
def sub(x,y):
return x-y
这样在启动 worker 时要制定项目中 Celery 实例所在的路径,这个时候装饰器会执行,同时会将任务注册,但是这种方法在平时简单写写脚本还好,如果在工程化项目中就显得太混乱,没有分类,所有任务都在一个文件中,不清晰没有层次感。
2)命令行
在 worker 启动时有个 -I 的参数选项,这个选项可以以逗号进行分割传入一个或者多个任务文件路径,比如有两个文件存在任务
3) 在执行 Celery 类实例化时作为参数传入
app = Celery(include=["project.tasks"])
这种方式比较推荐
4) 在配置文件中指定
在 celery 的用户配置文件中有项 CELERY_IMPORTS = ("project.tasks",) 推荐使用这种方式,比较灵活,方便维护
将 celery 的实例和 相关的 tasks 都写入到一个文件是最不推荐在项目中使用的,如果是简单的工具没有问题,通过命令行的传入也不是很优雅,3和4种无论采用哪种都需要在分发任务和所有 worker 机器上进行一致性维护,所以还是采用配置文件的方式比较好,当文件修改后统一下发到各个机器上。