本文介绍了芹菜不适用于全局变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

from celery import Celery

app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')

a_num = 0

@app.task
def addone():
    global a_num
    a_num = a_num + 1
    return a_num

这是我用来测试的代码芹菜。
我希望每次使用addone()时,返回值都应该增加。
但是总是1
为什么????

this is the code I used to test celery.I hope every time I use addone() the return value should increase.But it's always 1why???

结果

python
>> from tasks import addone
>> r = addone.delay()
>> r.get()
   1
>> r = addone.delay()
>> r.get()
   1
>> r = addone.delay()
>> r.get()
   1


推荐答案

默认情况下,启动工作程序时,Celery以并发4启动它,这意味着它已启动4个进程来处理任务请求。 (加上一个控制其他进程的进程。)我不知道使用哪种算法将任务请求分配给为工作人员启动的进程,但是最终,如果执行 addone.delay()。get, ()够了,您会看到数字大于1。发生的事情是每个进程(不是每个 task )都有自己的 a_num 的副本。当我在这里尝试时,我的第五次执行 addone.delay()。get()返回 2

By default when a worker is started Celery starts it with a concurrency of 4, which means it has 4 processes started to handle task requests. (Plus a process that controls the other processes.) I don't know what algorithm is used to assign task requests to the processes started for a worker but eventually, if you execute addone.delay().get() enough, you'll see the number get greater than 1. What happens is that each process (not each task) gets its own copy of a_num. When I try it here, my fifth execution of addone.delay().get() returns 2.

您可以通过使用单个进程来启动您的工作程序来处理请求,从而强制数字每次增加。 (例如, celery -A任务工作程序-c1 )但是,如果重新启动工作程序,则编号将重置为0。此外,我不会设计仅适用的代码如果将处理请求的进程数强制为1,那么一天下来,一位同事决定多个进程应处理对任务的请求,然后事情就中断了。 (代码注释中的粗大警告只能起到很多作用。)

You could force the number to increment each time by starting your worker with a single process to handle requests. (e.g. celery -A tasks worker -c1) However, if you ever restart your worker, the numbering will be reset to 0. Moreover, I would not design code that works only if the number of processes handling requests is forced to be 1. One day down the road a colleague decides that multiple processes should handle the requests for the tasks and then things break. (Big fat warnings in comments in the code can only do so much.)

最终,这种状态应该在缓存中共享,例如Redis,

At the end of the day, such state should be shared in a cache, like Redis, or a database used as a cache, which would work for the code in your question.

但是,在您写的注释中:

However, in a comment you wrote:

将连接存储在缓存中是行不通的。我强烈建议让Celery开始的每个进程都使用其自己的连接,而不是尝试在各个进程之间共享它。 不需要在每个新任务请求中重新打开连接。每个进程都打开一次,然后该进程处理的每个任务请求都将重新使用连接。

Storing the connection in a cache won't work. I would strongly advocate having each process that Celery starts use its own connection rather than try to share it among processes. The connection does not need to be reopened with each new task request. It is opened once per process, and then each task request served by this process reuses the connection.

在许多情况下,尝试在进程之间共享相同的连接(例如,通过 fork 共享虚拟内存)无论如何都不会成功。连接通常带有状态(例如,数据库连接是否处于自动提交模式)。如果代码的两部分期望连接处于不同的状态,则代码将不一致运行。

In many cases, trying to share the same connection among processes (through sharing virtual memory through a fork, for instance) would flat out not work anyway. Connections often carry state with them (e.g. whether a database connection is in autocommit mode). If two parts of the code expect the connection to be in different states, the code will operate inconsistently.

这篇关于芹菜不适用于全局变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-23 13:05