# celery 使用
1.broker
2.基础案例
使用redis作为broker和brokend。
创建tasks.py
# tasks.py
di = 'redis://:****@localhost:6379/0'
app = Celery('tasks', backend=di, broker=di)
@app.task
def add(x, y):
return x + y
运行:
celery -A tasks worker -l info -P eventlet
创建temp.py
# temp.py
from tasks import add
rv = add.delay(4, 4)
2.1 运行结果:
运行tasks
E:\python\code test>celery -A tasks worker -l info -P eventlet
-------------- celery@*** v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Windows0 2019-09-21 22:08:04
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x1aebfdcf98
- ** ---------- .> transport: redis://:**@localhost:6379/0
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2019-09-21 22:08:04,802: INFO/MainProcess] Connected to redis://:**@192.168.199
.113:6379/0
[2019-09-21 22:08:04,813: INFO/MainProcess] mingle: searching for neighbors
[2019-09-21 22:08:05,849: INFO/MainProcess] mingle: all alone
[2019-09-21 22:08:05,886: INFO/MainProcess] celery@*** ready.
[2019-09-21 22:08:05,905: INFO/MainProcess] pidbox: Connected to redis://:**@...../0.
运行temp
[2019-09-21 22:11:27,198: INFO/MainProcess] Received task: tasks.add[06d745c6-53
18-4f48-8a1e-2ab8f8563994]
[2019-09-21 22:11:27,200: INFO/MainProcess] Task tasks.add[06d745c6-5318-4f48-8a
1e-2ab8f8563994] succeeded in 0.0s: 8
[2019-09-21 22:11:31,935: INFO/MainProcess] Received task: tasks.add[115c3b5d-eb
a7-472b-86ab-bd356f650e13]
[2019-09-21 22:11:31,936: INFO/MainProcess] Task tasks.add[115c3b5d-eba7-472b-86
ab-bd356f650e13] succeeded in 0.0s: 8
2.2 问题
在运行时出现两个问题:
- redis-py版本问题,目前为2.*,要求升级
pip install --upgrade redis
升级到4.*** 报错ValueError: not enough values to unpack (expected 3, got 0)
解决方法:
看别人描述大概就是说win10上运行celery4.x就会出现这个问题,解决办法如下,原理未知:安装`eventlet
pip install eventlet然后启动worker的时候加一个参数,如下:
celery -A worker -l info -P eventlet
然后就可以正常的调用了。
3.复杂一点的测试环境
一般而言,celery项目的代码分为三部分:
- worker定义
- tasks定义
- tasks添加
结构:
proj/__init__.py /celery_worker.py # worker定义 /celery_tasks.py # tasks定义 /celery_run.py # 调用
proj/celery_worker.py
# celery test -- worker from celery import Celery di_broker = 'redis://:123@192.168.199.113:6379/0' di_backend = 'redis://:123@192.168.199.113:6379/1' def create_worker(): # app = Celery('tasks', broker=di) app = Celery('tasks', backend=di_backend, broker=di_broker, include=['code_2.celery_tasks']) app.conf.update(result_expires=3600,) return app app = create_worker() if __name__ == '__main__': app.start()
proj/celery_tasks.py
from celery_worker import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
proj/celery_run.py
# celery test from celery_tasks import add rv = add.delay(4, 4) out = rv.get(timeout=1) print(out) out = rv.ready() print(out)
start the woker
celery -A celery_tasks worker -l info -P eventlet
stopping the woker
ctrl+c
实验环境搭建完成,下面测试复杂一点的功能。
4.calling tasks
接口
add(4, 4) # 本地调用
add.delay(4, 4) # worker执行
This method is actually a star-argument shortcut to another method called apply_async():
add.apply_async((2, 2))
可以使用更多参数
add.apply_async((2, 2), queue='lopri', countdown=10)
上句代表任务发送到lopri队列,至少等待10秒才执行
每个任务都会被赋与一个id
The delay and apply_async methods return an AsyncResult instance
如果指定了backend,可以查看任务的执行情况
res = add.delay(2, 2)
res.get(timeout=1)
4
You can find the task’s id by looking at the id attribute:
res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114
You can also inspect the exception and traceback if the task raised an exception, in fact result.get() will propagate any errors by default:
res = add.delay(2)
res.get(timeout=1)
If you don’t wish for the errors to propagate then you can disable that by passing the propagate argument:
res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)',)
5.server/worker
5.1 基础讲解
(vir_venv) E:\python\code>celery -A celery_tasks worker -l info -P eventlet -------------- celery@** v4.3.0 (rhubarb) ---- **** ----- --- * *** * -- Windows-8.1-6.3. 2019-09-22 10:50:49 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x38ac527d30 - ** ---------- .> transport: redis://:**@***:6379/0 - ** ---------- .> results: redis://:**@***:6379/1 - *** --- * --- .> concurrency: 4 (eventlet) # 并发数 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
这里使用eventlet代表每次任务都是在单独线程中执行。
task events参数决定是否监视worker
5.2 后台运行
celery multi start worker1 -A celery_worker -l info celery multi restart w1 -A proj -l info celery multi stop w1 -A proj -l info # 等待执行完成 celery multi stopwait w1 -A proj -l info
6.task组合结构/工作流
task支持下面的方法:
add.signature((2, 2), countdown=10)
tasks.add(2, 2)
There’s also a shortcut using star arguments:add.s(2, 2)
tasks.add(2, 2)def func2():
r = add.s(2,2)
pr_type(r)
rv = r.delay()
out = rv.get(timeout=5)
print(out)
out = rv.ready()
print(out)
看起来它像partial,实质也是对tasks的一个封装,使用它的目的是为了构造更复杂的任务结构。
支持的组合结构如下:
group chain chord map starmap chunks
以group为例:
>>> g = group(add.s(i) for i in xrange(10)) >>> g(10).get() [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]