celery执行异步任务

PS:特别需要注意创建工程的时候,实例化Celery所在的文件文件名称必须为celery.py,且包下有__init__.py文件,在实例化celery的时候必须将include参数添加正确

目录结构

study_celery				# 工程文件夹
	- proj					# proj包
		- __init__.py		# 必须要含有__init__.py
		- celery.py			# 名称必须为celery.py,在此处编写定时任务代码
		- tasks.py			# 任务文件,可以有多个
	- add_task1.py			# 添加任务到backend中,执行异步任务
	- add_task2.py			# 添加任务到backend中,执行延迟任务

proj/celery.py

from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

broker = 'redis://:123456@10.0.0.100:6379/1'
backend = 'redis://:123456@10.0.0.100:6379/2'
app = Celery(__name__, broker=broker, backend=backend, include=['proj.tasks'])

app.conf.beat_schedule = {
    'low-task': {
        'task': 'proj.tasks.add',  # celery_task下的task1的add方法
        'schedule': timedelta(seconds=15),  # 每15秒执行1次
        # 'schedule': crontab(hour=8, day_of_week=2),  # 每周二早八点
        'args': (300, 150),  # add所需要的参数
    }
}
  1. 创建celery实例即app对象
  2. 执行存放任务broker地址和存放结果地址backend
  3. include参数是程序启动时倒入的模块列表,可以该处添加任务模块,便于worker能够对应相应的任务,列表中可以添加多个。
  4. 添加定时任务 beat

proj/tasks.py

from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y 
  1. 从celery中导入app对象
  2. 创建两个任务函数通过@app.task进行装饰

add_task1.py

from proj.tasks import add, mul

result = add.delay(1, 2)
result1 = mul.delay(8, 2)

print(result)
print(result1)

add_task2.py

from datetime import datetime, timedelta
from proj.tasks import add, mul

# 当前的utc时间向后延迟10s,默认使用utc时间
eta = datetime.utcnow() + timedelta(seconds=10)
# 10s后执行add和mul
ret1 = add.apply_async(args=(2, 3), eta=eta)
ret2 = mul.apply_async(args=(4, 5), eta=eta)
print(ret1)
print(ret2)

启动

# 需要cd到proj所在的目录下
# linux启动
celery -A proj worker -l info
# windows启动
celery -A proj worker -l info -P eventlet

---console---

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@6ae5fd398c10 v5.2.3 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1160.59.1.el7.x86_64-x86_64-with-debian-9.11 2022-03-31 08:55:46
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         proj.celery:0x7f357cd743d0
- ** ---------- .> transport:   redis://:**@127.0.0.1:6379/1
- ** ---------- .> results:     redis://:**@127.0.0.1:6379/2
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . proj.tasks.add
  . proj.tasks.mul

[2022-03-31 08:55:46,902: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/1
[2022-03-31 08:55:46,904: INFO/MainProcess] mingle: searching for neighbors
[2022-03-31 08:55:47,914: INFO/MainProcess] mingle: all alone
[2022-03-31 08:55:47,925: INFO/MainProcess] celery@6ae5fd398c10 ready.

执行异步任务

# 需要cd到add_task1所在的目录下
python add_task.py


---console---
d6e5e56d-912f-40e6-b1ab-f08b1d59da98
af37bc9b-1cf9-43fe-87c4-fbd3f79d8066

---worker console---
[2022-03-31 08:58:09,033: INFO/MainProcess] Task proj.tasks.add[d6e5e56d-912f-40e6-b1ab-f08b1d59da98] received
[2022-03-31 08:58:09,048: INFO/MainProcess] Task proj.tasks.mul[af37bc9b-1cf9-43fe-87c4-fbd3f79d8066] received
[2022-03-31 08:58:09,061: INFO/ForkPoolWorker-1] Task proj.tasks.add[d6e5e56d-912f-40e6-b1ab-f08b1d59da98] succeeded in 0.02734477199999219s: 3
[2022-03-31 08:58:09,064: INFO/ForkPoolWorker-1] Task proj.tasks.mul[af37bc9b-1cf9-43fe-87c4-fbd3f79d8066] succeeded in 0.0013154429999531203s: 16

添加任务和worker先启动哪一个都可以,没有影响,当worker启动后,任务都会执行

执行延迟任务

# # 需要cd到add_task2.py所在的目录下
python add_task2.py

---console---
13e3a2ee-0bb5-4428-8a78-1fe29b499062
f68b434b-3957-466d-8294-5af66dfda468

此时worker所在的bash窗口等待片刻后输出

[2022-04-06 09:55:10,934: INFO/MainProcess] Task proj.tasks.add[13e3a2ee-0bb5-4428-8a78-1fe29b499062] received
[2022-04-06 09:55:10,956: INFO/MainProcess] Task proj.tasks.mul[f68b434b-3957-466d-8294-5af66dfda468] received
[2022-04-06 09:55:20,954: INFO/ForkPoolWorker-1] Task proj.tasks.mul[f68b434b-3957-466d-8294-5af66dfda468] succeeded in 0.005011890999867319s: 20
[2022-04-06 09:55:20,957: INFO/ForkPoolWorker-1] Task proj.tasks.add[13e3a2ee-0bb5-4428-8a78-1fe29b499062] succeeded in 0.0019722010001714807s: 5

执行定时任务

# 代码编写在proj/celery.py中
# worker启动后,新打开一个bash窗口输入
celery -A proj beat -l info 

---console---
celery beat v5.2.3 (dawn-chorus) is starting.
__    -    ... __   -        _
LocalTime -> 2022-04-06 09:58:22
Configuration ->
    . broker -> redis://:**@127.0.0.1:6379/1
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2022-04-06 09:58:22,686: INFO/MainProcess] beat: Starting...
[2022-04-06 09:58:22,696: INFO/MainProcess] Scheduler: Sending due task low-task (proj.tasks.add)
[2022-04-06 09:58:37,699: INFO/MainProcess] Scheduler: Sending due task low-task (proj.tasks.add)

此时看到,beat所在的bash窗口每隔15s调动一次任务
打开worker所在的任务窗口如下

[2022-04-06 09:58:22,704: INFO/MainProcess] Task proj.tasks.add[c8b26f6a-f29f-43cf-9555-cd8746cbfb21] received
[2022-04-06 09:58:22,706: INFO/ForkPoolWorker-1] Task proj.tasks.add[c8b26f6a-f29f-43cf-9555-cd8746cbfb21] succeeded in 0.0015727149998383538s: 450
[2022-04-06 09:58:37,704: INFO/MainProcess] Task proj.tasks.add[82a406dc-c310-4bbb-ba83-a7d73bb2ae5e] received
[2022-04-06 09:58:37,707: INFO/ForkPoolWorker-1] Task proj.tasks.add[82a406dc-c310-4bbb-ba83-a7d73bb2ae5e] succeeded in 0.0025173189997076406s: 450
08-10 07:24