Celery异步任务和定期任务一起

Celery异步任务和定期任务一起

本文介绍了Celery异步任务和定期任务一起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

无法同时运行定期任务和异步任务。虽然,如果我将定期任务注释掉,异步任务会很好地执行,否则异步任务会卡住。


运行:celery == 4.0.2,Django == 2.0,django-celery -beat == 1.1.0,django-celery-results == 1.0.1


已引用:选择celery == 4.0.2版本,因为它似乎可以工作。

celery.py

 从celery导入django 
import os

import Celery

#为 celery程序设置默认的Django设置模块。
os.environ.setdefault('DJANGO_SETTINGS_MODULE','bid.settings')

#设置django项目
django.setup()

应用程序= Celery('bid')

#在此处使用字符串意味着工作人员不必将
#的配置对象序列化为子进程。
#-namespace ='CELERY'表示所有与芹菜相关的配置键
#都应带有`CELERY_`前缀。
app.config_from_object('django.conf:settings',namespace ='CELERY')

#从所有已注册的Django应用程序配置中加载任务模块。
app.autodiscover_tasks()

settings.py

  INSTALLED_APPS =(
...
'django_celery_results',
'django_celery_beat',


#芹菜相关设置

CELERY_BROKER_URL ='redis:// localhost:6379/0'
CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout':43200,}
CELERY_RESULT_BACKEND ='django -db'
CELERY_ACCEPT_CONTENT = ['application / json']
CELERY_RESULT_SERIALIZER ='json'
CELERY_TASK_SERIALIZER ='json'
CELERY_CONTENT_ENCODING ='utf-8'
CELERY_ENABLE =假
CELERY_SEND_EVENTS =假
CELERY_TIMEZONE ='亚洲/加尔各答'
CELERY_BEAT_SCHEDULER ='django_celery_beat.schedulers:DatabaseScheduler'

定期任务

  @periodic_task(run_every = crontab(hour = 7,minutes = 30),名称=每天发送供应商状态)
def send_vendor_statu s():
返回timezone.now()

异步任务

  @shared_task 
def vendor_creation_email(id):
返回已发送电子邮件

异步任务调用者

  vendor_creation_email.apply_async(args = [instance.id,])#如果计划了定期作业,则主线程将卡在此处。 

以如下所示的节拍运行工人

 芹菜工人-A bid -l调试-B 

请帮助。

解决方案

以下是一些观察结果,这些发现是由于多次试验和错误,并深入研究了celery的源代码。


  1. @periodic_task 已弃用。因此,它将不起作用。

从其源代码中:

 #venv36 / lib / python3.6 / site-packages / celery / task / base.py 
def period_task(* args,** options):
不推荐使用的装饰器,请使用:setting:`beat_schedule`。
返回任务(** dict({'base':PeriodicTask},** options))




  1. 使用UTC作为基本时区,以避免以后与时区相关的混乱。配置定期任务以根据UTC计算时间触发。例如


  2. 创建celery.py,如下所示:


celery.py

 从celery导入django 
import os

import Celery
#为'celery'程序设置默认的Django设置模块。来自celery.schedules的
导入crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')
#设置django项目
django.setup( )

app = Celery('proj')

#在这里使用字符串意味着工作人员不必序列化
#配置对象到子进程。
#-namespace ='CELERY'表示所有与芹菜相关的配置键
#都应带有`CELERY_`前缀。
app.config_from_object('django.conf:settings',namespace ='CELERY')

#从所有已注册的Django应用程序配置中加载任务模块。
app.autodiscover_tasks()
app.conf.beat_schedule = {
'test_task':{
'task':'test_task',
'schedule':crontab (hour = 2,minute = 0),
}
}

任务可以位于任何应用程序下的task.py中,如下所示:

  @shared_task(name = test_task)
def test_add():
print( Testing beat service)

使用芹菜工人-pro -l信息芹菜节拍-a proj -l信息工人和拍子,以及经纪人,例如Redis。并且此设置应该可以正常工作。


Unable to run periodic tasks along with asynchronous tasks together. Although, if I comment out the periodic task, asynchronous tasks are executed fine, else asynchronous tasks are stuck.

Running: celery==4.0.2, Django==2.0, django-celery-beat==1.1.0, django-celery-results==1.0.1

Referred: https://github.com/celery/celery/issues/4184 to choose celery==4.0.2 version, as it seems to work.

celery.py

import django
import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'bid.settings')

# Setup django project
django.setup()

app = Celery('bid')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

settings.py

INSTALLED_APPS = (
         ...
         'django_celery_results',
         'django_celery_beat',
     )

# Celery related settings

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200, }
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_CONTENT_ENCODING = 'utf-8'
CELERY_ENABLE_REMOTE_CONTROL = False
CELERY_SEND_EVENTS = False
CELERY_TIMEZONE = 'Asia/Kolkata'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

Periodic task

@periodic_task(run_every=crontab(hour=7, minute=30), name="send-vendor-status-everyday")
def send_vendor_status():
    return timezone.now()

Async task

@shared_task
def vendor_creation_email(id):
   return "Email Sent"

Async task caller

vendor_creation_email.apply_async(args=[instance.id, ]) # main thread gets stuck here, if periodic jobs are scheduled.

Running the worker, with beat as follows

celery worker -A bid -l debug -B

Please help.

解决方案

Here are a few observations, resulted from multiple trial and errors, and diving into celery's source code.

  1. @periodic_task is deprecated. Hence it would not work.

from their source code:

#venv36/lib/python3.6/site-packages/celery/task/base.py
def periodic_task(*args, **options):
    """Deprecated decorator, please use :setting:`beat_schedule`."""
    return task(**dict({'base': PeriodicTask}, **options))
  1. Use UTC as base timezone, to avoid timezone related confusions later on. Configure periodic task to fire on calculated times with respect to UTC. e.g. for 'Asia/Calcutta' reduce the time by 5hours 30mins.

  2. Create a celery.py as follows:

celery.py

import django
import os

from celery import Celery
# set the default Django settings module for the 'celery' program.
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
# Setup django project
django.setup()

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
app.conf.beat_schedule = {
    'test_task': {
        'task': 'test_task',
        'schedule': crontab(hour=2,minute=0),
    }
}

and task could be in tasks.py under any app, as follows

@shared_task(name="test_task")
def test_add():
    print("Testing beat service")

Use celery worker -A proj -l info and celery beat -A proj -l info for worker and beat, along with a broker e.g. redis. and this setup should work fine.

这篇关于Celery异步任务和定期任务一起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 10:55