celery执行异步任务
PS:特别需要注意创建工程的时候,实例化Celery所在的文件文件名称必须为celery.py,且包下有__init__.py文件,在实例化celery的时候必须将include参数添加正确
目录结构
study_celery # 工程文件夹
- proj # proj包
- __init__.py # 必须要含有__init__.py
- celery.py # 名称必须为celery.py,在此处编写定时任务代码
- tasks.py # 任务文件,可以有多个
- add_task1.py # 添加任务到backend中,执行异步任务
- add_task2.py # 添加任务到backend中,执行延迟任务
proj/celery.py
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab
broker = 'redis://:123456@10.0.0.100:6379/1'
backend = 'redis://:123456@10.0.0.100:6379/2'
app = Celery(__name__, broker=broker, backend=backend, include=['proj.tasks'])
app.conf.beat_schedule = {
'low-task': {
'task': 'proj.tasks.add', # celery_task下的task1的add方法
'schedule': timedelta(seconds=15), # 每15秒执行1次
# 'schedule': crontab(hour=8, day_of_week=2), # 每周二早八点
'args': (300, 150), # add所需要的参数
}
}
- 创建celery实例即app对象
- 执行存放任务broker地址和存放结果地址backend
- include参数是程序启动时倒入的模块列表,可以该处添加任务模块,便于worker能够对应相应的任务,列表中可以添加多个。
- 添加定时任务 beat
proj/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
- 从celery中导入app对象
- 创建两个任务函数通过@app.task进行装饰
add_task1.py
from proj.tasks import add, mul
result = add.delay(1, 2)
result1 = mul.delay(8, 2)
print(result)
print(result1)
add_task2.py
from datetime import datetime, timedelta
from proj.tasks import add, mul
# 当前的utc时间向后延迟10s,默认使用utc时间
eta = datetime.utcnow() + timedelta(seconds=10)
# 10s后执行add和mul
ret1 = add.apply_async(args=(2, 3), eta=eta)
ret2 = mul.apply_async(args=(4, 5), eta=eta)
print(ret1)
print(ret2)
启动
# 需要cd到proj所在的目录下
# linux启动
celery -A proj worker -l info
# windows启动
celery -A proj worker -l info -P eventlet
---console---
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
-------------- celery@6ae5fd398c10 v5.2.3 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-3.10.0-1160.59.1.el7.x86_64-x86_64-with-debian-9.11 2022-03-31 08:55:46
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: proj.celery:0x7f357cd743d0
- ** ---------- .> transport: redis://:**@127.0.0.1:6379/1
- ** ---------- .> results: redis://:**@127.0.0.1:6379/2
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. proj.tasks.add
. proj.tasks.mul
[2022-03-31 08:55:46,902: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/1
[2022-03-31 08:55:46,904: INFO/MainProcess] mingle: searching for neighbors
[2022-03-31 08:55:47,914: INFO/MainProcess] mingle: all alone
[2022-03-31 08:55:47,925: INFO/MainProcess] celery@6ae5fd398c10 ready.
执行异步任务
# 需要cd到add_task1所在的目录下
python add_task.py
---console---
d6e5e56d-912f-40e6-b1ab-f08b1d59da98
af37bc9b-1cf9-43fe-87c4-fbd3f79d8066
---worker console---
[2022-03-31 08:58:09,033: INFO/MainProcess] Task proj.tasks.add[d6e5e56d-912f-40e6-b1ab-f08b1d59da98] received
[2022-03-31 08:58:09,048: INFO/MainProcess] Task proj.tasks.mul[af37bc9b-1cf9-43fe-87c4-fbd3f79d8066] received
[2022-03-31 08:58:09,061: INFO/ForkPoolWorker-1] Task proj.tasks.add[d6e5e56d-912f-40e6-b1ab-f08b1d59da98] succeeded in 0.02734477199999219s: 3
[2022-03-31 08:58:09,064: INFO/ForkPoolWorker-1] Task proj.tasks.mul[af37bc9b-1cf9-43fe-87c4-fbd3f79d8066] succeeded in 0.0013154429999531203s: 16
添加任务和worker先启动哪一个都可以,没有影响,当worker启动后,任务都会执行
执行延迟任务
# # 需要cd到add_task2.py所在的目录下
python add_task2.py
---console---
13e3a2ee-0bb5-4428-8a78-1fe29b499062
f68b434b-3957-466d-8294-5af66dfda468
此时worker所在的bash窗口等待片刻后输出
[2022-04-06 09:55:10,934: INFO/MainProcess] Task proj.tasks.add[13e3a2ee-0bb5-4428-8a78-1fe29b499062] received
[2022-04-06 09:55:10,956: INFO/MainProcess] Task proj.tasks.mul[f68b434b-3957-466d-8294-5af66dfda468] received
[2022-04-06 09:55:20,954: INFO/ForkPoolWorker-1] Task proj.tasks.mul[f68b434b-3957-466d-8294-5af66dfda468] succeeded in 0.005011890999867319s: 20
[2022-04-06 09:55:20,957: INFO/ForkPoolWorker-1] Task proj.tasks.add[13e3a2ee-0bb5-4428-8a78-1fe29b499062] succeeded in 0.0019722010001714807s: 5
执行定时任务
# 代码编写在proj/celery.py中
# worker启动后,新打开一个bash窗口输入
celery -A proj beat -l info
---console---
celery beat v5.2.3 (dawn-chorus) is starting.
__ - ... __ - _
LocalTime -> 2022-04-06 09:58:22
Configuration ->
. broker -> redis://:**@127.0.0.1:6379/1
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2022-04-06 09:58:22,686: INFO/MainProcess] beat: Starting...
[2022-04-06 09:58:22,696: INFO/MainProcess] Scheduler: Sending due task low-task (proj.tasks.add)
[2022-04-06 09:58:37,699: INFO/MainProcess] Scheduler: Sending due task low-task (proj.tasks.add)
此时看到,beat所在的bash窗口每隔15s调动一次任务
打开worker所在的任务窗口如下
[2022-04-06 09:58:22,704: INFO/MainProcess] Task proj.tasks.add[c8b26f6a-f29f-43cf-9555-cd8746cbfb21] received
[2022-04-06 09:58:22,706: INFO/ForkPoolWorker-1] Task proj.tasks.add[c8b26f6a-f29f-43cf-9555-cd8746cbfb21] succeeded in 0.0015727149998383538s: 450
[2022-04-06 09:58:37,704: INFO/MainProcess] Task proj.tasks.add[82a406dc-c310-4bbb-ba83-a7d73bb2ae5e] received
[2022-04-06 09:58:37,707: INFO/ForkPoolWorker-1] Task proj.tasks.add[82a406dc-c310-4bbb-ba83-a7d73bb2ae5e] succeeded in 0.0025173189997076406s: 450