本文介绍了Dask.Compute()中的重试数不清楚的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从文档Number of allowed automatic retries if computing a result fails.

"Result"指的是每个单独的任务还是整个Computer()调用?

如果是指整个调用,如何对dask.delayed中的每个任务进行重试?

此外,我不确定重试是否有效,如下面的代码所示。

import dask
import random

@dask.delayed
def add(x, y):
    return x + y

@dask.delayed
def divide(sum_i):
    n = random.randint(0, 1)
    result = sum_i / n
    return result

tasks = []
for i in range(3):
    sum_i = add(i, i+1)
    divide_n = divide(sum_i)
    tasks.append(divide_n)

dask.compute(*tasks, retries=1000)

预期输出为(1,3,5),实际输出为ZeroDivisionError。

推荐答案

如果有人感兴趣,我们会为任务使用@Rtry修饰符,如下所示:

@dask.delayed
@retry(Exception, tries=3, delay=5)
def my_func():
    pass

重试修饰符:

from functools import wraps

def retry(exceptions, tries=4, delay=3, backoff=2, logger=None):
    """
    Retry calling the decorated function using an exponential backoff.

    Args:
        exceptions: The exception to check. may be a tuple of
            exceptions to check.
        tries: Number of times to try (not retry) before giving up.
        delay: Initial delay between retries in seconds.
        backoff: Backoff multiplier (e.g. value of 2 will double the delay
            each retry).
        logger: Logger to use.

    """
    if not logger:
        logger = logging.getLogger(__name__)

    def deco_retry(f):
        @wraps(f)
        def f_retry(*args, **kwargs):
            mtries, mdelay = tries, delay
            while mtries > 1:
                try:
                    return f(*args, **kwargs)
                except exceptions as e:
                    msg = f"{e}, 
Retrying in {mdelay} seconds..."
                    logger.warning(msg)
                    sleep(mdelay)
                    mtries -= 1
                    mdelay *= backoff
            return f(*args, **kwargs)
        return f_retry  # true decorator

    return deco_retry

这篇关于Dask.Compute()中的重试数不清楚的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-01 18:49