我正在尝试从 celery 任务中读取元信息,以防超时(如果任务未在给定时间内完成)。我有3个 celery worker 。当我连续在3个worker上执行任务时,我的超时逻辑(从redis后端获取元信息)可以正常工作。但是,当我使用线程并行执行任务时,出现错误'AttributeError:'DisabledBackend'对象没有属性'_get_task_meta_for'。

主脚本。

from threading import Thread
from util.tasks import app
from celery.exceptions import TimeoutError
# from celery.task.control import revoke
from celery.result import AsyncResult

def run(cmd, workerName, async=False, timeout=9999999):
        print "Executing Celery cmd: ", cmd
        ret = app.send_task(workerName+'.run_cmd', args=[cmd], kwargs={}, queue=workerName)
        if async:
            return ret
        else:
            try:
                return ret.get(timeout=timeout)
            except TimeoutError:
                task = AsyncResult(ret.task_id)
                # print task.info
                out = task.info['PROGRESS']
                # stop_task(ret.task_id)
                print 'TIMEOUT', out
                return 'TIMEOUT', out


cmd = r'ping 10.10.10.10'
threads = []

# this block works
print "This block works"
run(cmd, 'MH_VTF203', timeout=10)
run(cmd, 'MH_VTF1661', timeout=10)
run(cmd, 'MH_VTF106', timeout=10)


# this block errors
print "This block erros"
for vtf in ['MH_VTF203', 'MH_VTF1661', 'MH_VTF106']:
    t = Thread(target=run, args=[cmd, vtf], kwargs={'timeout': 10})
    t.start()
    threads.append(t)
for t in threads:
    t.join()

util.tasks.py
from celery import Celery
import subprocess


app = Celery('tasks', backend='redis://', broker='redis://localhost:6379/0')
app.conf.CELERY_IGNORE_RESULT = False
app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'


@app.task()
def run_cmd(*args, **kwargs):

    cmd = " ".join(args)
    print "executing command :",cmd
    try:
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out = ""
        while p.poll() is None:
            l = p.stdout.readline()
            print l
            out += l
            run_cmd.update_state(
                state='PROGRESS',
                meta={'PROGRESS': out}
            )
        l = p.stdout.read()
        print l
        out += l
        return out
    except subprocess.CalledProcessError, e:
        print 'Error executing command: ', cmd
        return str(e)

输出。
C:\Python27\python.exe C:/Users/mkr/Documents/work/New_RoD/testing/run.py
    This block works
    Executing Celery cmd:  ping 10.10.10.10
    TIMEOUT
    Pinging 10.10.10.10 with 32 bytes of data:
    Request timed out.
    Request timed out.

    Executing Celery cmd:  ping 10.10.10.10
    TIMEOUT
    Pinging 10.10.10.10 with 32 bytes of data:
    Request timed out.
    Request timed out.

    Executing Celery cmd:  ping 10.10.10.10
    TIMEOUT
    Pinging 10.10.10.10 with 32 bytes of data:
    Request timed out.
    Request timed out.

    This block erros
    Executing Celery cmd:  ping 10.10.10.10
    Executing Celery cmd:  ping 10.10.10.10
    Executing Celery cmd:  ping 10.10.10.10
    Exception in thread Thread-1:
    Traceback (most recent call last):
      File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
        self.run()
      File "C:\Python27\lib\threading.py", line 763, in run
        self.__target(*self.__args, **self.__kwargs)
      File "C:/Users/mkr/Documents/work/New_RoD/testing/run.py", line 18, in run
        out = task.info['PROGRESS']
      File "C:\Python27\lib\site-packages\celery\result.py", line 356, in result
        return self._get_task_meta()['result']
      File "C:\Python27\lib\site-packages\celery\result.py", line 339, in _get_task_meta
        return self._maybe_set_cache(self.backend.get_task_meta(self.id))
      File "C:\Python27\lib\site-packages\celery\backends\base.py", line 292, in get_task_meta
        meta = self._get_task_meta_for(task_id)
    AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
        self.run()
      File "C:\Python27\lib\threading.py", line 763, in run
        self.__target(*self.__args, **self.__kwargs)
      File "C:/Users/mkr/Documents/work/New_RoD/testing/run.py", line 18, in run
        out = task.info['PROGRESS']
      File "C:\Python27\lib\site-packages\celery\result.py", line 356, in result
        return self._get_task_meta()['result']
      File "C:\Python27\lib\site-packages\celery\result.py", line 339, in _get_task_meta
        return self._maybe_set_cache(self.backend.get_task_meta(self.id))
      File "C:\Python27\lib\site-packages\celery\backends\base.py", line 292, in get_task_meta
        meta = self._get_task_meta_for(task_id)
    AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

    Exception in thread Thread-3:
    Traceback (most recent call last):
      File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
        self.run()
      File "C:\Python27\lib\threading.py", line 763, in run
        self.__target(*self.__args, **self.__kwargs)
      File "C:/Users/mkr/Documents/work/New_RoD/testing/run.py", line 18, in run
        out = task.info['PROGRESS']
      File "C:\Python27\lib\site-packages\celery\result.py", line 356, in result
        return self._get_task_meta()['result']
      File "C:\Python27\lib\site-packages\celery\result.py", line 339, in _get_task_meta
        return self._maybe_set_cache(self.backend.get_task_meta(self.id))
      File "C:\Python27\lib\site-packages\celery\backends\base.py", line 292, in get_task_meta
        meta = self._get_task_meta_for(task_id)
    AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'


    Process finished with exit code 0

最佳答案

使用app.AsyncResult为我工作

关于python - AttributeError : 'DisabledBackend' object has no attribute '_get_task_meta_for' ,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31098547/

10-09 13:52