TLDR;

要为 celery 产生的每个进程运行一个初始化函数,您可以使用 worker_process_init 信号。正如您在 docs 中所读到的那样,该信号的处理程序不应阻塞超过 4 秒。
但是,如果我必须运行一个执行时间超过 4 秒的 init 函数,有哪些选择?

问题

我使用 C 扩展模块在 celery 任务中运行某些操作。此模块需要初始化,这可能需要几秒钟(可能 4 - 10)。因为我宁愿不为每个任务运行这个 init 函数,而是为每个产生的进程运行这个 init 函数,所以我使用了 worker_process_init 信号:

#lib.py
import isclient #c extension module
client = None
def init():
    global client
    client = isclient.Client() #this might take a while

def create_ne_list(text):
    return client.ne_receiventities4datachunk(text)

#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init

celery = Celery(include=[
    'isc.ne.tasks'
])

celery.config_from_object('celeryconfig')

@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
    init()

if __name__ == '__main__':
    celery.start()

#tasks.py
from celery import celery
from lib import create_ne_list as cnl

@celery.task(time_limit=1200)
def create_ne_list(text):
    return cnl(text)

当我运行此代码时会发生什么,这就是我在之前的问题 ( Celery: stuck in infinitly repeating timeouts (Timed out waiting for UP message) ) 中所描述的。简而言之:由于我的 init 函数需要超过 4 秒,有时会发生一个 worker 被杀死并重新启动,并且在重新启动过程中再次被杀死,因为这是 4 秒无响应后自动发生的情况。这最终会导致无限重复的终止和重启过程。

另一种选择是使用信号 worker_init 为每个 worker 只运行一次我的 init 函数。如果我这样做,我会遇到一个不同的问题:现在排队的进程由于某种原因卡住了。
当我以 3 的并发启动 worker,然后发送几个任务时,前三个将完成,其余的不会被触及。 (我认为这可能与以下事实有关,即 client 对象需要在多个进程之间共享,并且 C 扩展出于某些原因不支持这一点。但说实话,我相对较新多处理,所以我可以猜测)

问题

因此,问题仍然存在:如何为每个进程运行一个耗时超过 4 秒的 init 函数?有没有正确的方法来做到这一点,那会是什么方式?

最佳答案

Celery 将进程初始化超时限制为 4.0 秒。
检查 source code

要解决此限制,您可以考虑在创建 celery 应用程序之前更改它

from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough

请注意,没有配置或设置可以更改此值。

关于python - Celery:运行冗长初始化函数的正确方法(每个进程),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/24202789/

10-14 16:07
查看更多