TLDR;
要为 celery 产生的每个进程运行一个初始化函数,您可以使用 worker_process_init
信号。正如您在 docs 中所读到的那样,该信号的处理程序不应阻塞超过 4 秒。
但是,如果我必须运行一个执行时间超过 4 秒的 init 函数,有哪些选择?
问题
我使用 C 扩展模块在 celery 任务中运行某些操作。此模块需要初始化,这可能需要几秒钟(可能 4 - 10)。因为我宁愿不为每个任务运行这个 init 函数,而是为每个产生的进程运行这个 init 函数,所以我使用了 worker_process_init
信号:
#lib.py
import isclient #c extension module
client = None
def init():
global client
client = isclient.Client() #this might take a while
def create_ne_list(text):
return client.ne_receiventities4datachunk(text)
#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init
celery = Celery(include=[
'isc.ne.tasks'
])
celery.config_from_object('celeryconfig')
@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
init()
if __name__ == '__main__':
celery.start()
#tasks.py
from celery import celery
from lib import create_ne_list as cnl
@celery.task(time_limit=1200)
def create_ne_list(text):
return cnl(text)
当我运行此代码时会发生什么,这就是我在之前的问题 ( Celery: stuck in infinitly repeating timeouts (Timed out waiting for UP message) ) 中所描述的。简而言之:由于我的 init 函数需要超过 4 秒,有时会发生一个 worker 被杀死并重新启动,并且在重新启动过程中再次被杀死,因为这是 4 秒无响应后自动发生的情况。这最终会导致无限重复的终止和重启过程。
另一种选择是使用信号
worker_init
为每个 worker 只运行一次我的 init 函数。如果我这样做,我会遇到一个不同的问题:现在排队的进程由于某种原因卡住了。当我以 3 的并发启动 worker,然后发送几个任务时,前三个将完成,其余的不会被触及。 (我认为这可能与以下事实有关,即
client
对象需要在多个进程之间共享,并且 C 扩展出于某些原因不支持这一点。但说实话,我相对较新多处理,所以我可以猜测)问题
因此,问题仍然存在:如何为每个进程运行一个耗时超过 4 秒的 init 函数?有没有正确的方法来做到这一点,那会是什么方式?
最佳答案
Celery 将进程初始化超时限制为 4.0 秒。
检查 source code
要解决此限制,您可以考虑在创建 celery 应用程序之前更改它
from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough
请注意,没有配置或设置可以更改此值。
关于python - Celery:运行冗长初始化函数的正确方法(每个进程),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/24202789/