假设我们有可能长期运行的任务:

public class LongRunningTask {
    public ReturnType doSomething() {
        ...
    }
}


并且我们想同时运行许多这些任务。

因此,我对此感到满意:

public class LongRunningCallable implements Callable<LongRunningTask> {
    private final LongRunningTask task;
    ...
    public ReturnType call() {
        return task.doSomething();
    }
    ...
}


现在,由于这可能会持续很长时间,因此我想将其限制为仅运行一定量。因此,我可能会执行以下操作:

public class InterruptibleCallable<T> implements Callable<T> {
        protected final long timeout;
        protected final TimeUnit timeUnit;
        protected final Callable<T> callable;

        public InterruptibleCallable(long timeout, TimeUnit timeUnit, Callable<T> callable) {
                this.timeout = timeout;
                this.timeUnit = timeUnit;
                this.callable = callable;
        }

        @Override
        public T call() throws Exception {

                ExecutorService executorService = Executors.newSingleThreadExecutor();

                T result = null;

                try {
                    result = executorService.submit(callable).get(timeout, timeUnit);
                } catch (InterruptedException e) {
                    LOGGER.error("Callable: " + callable.toString() + " was interrupted");
                    throw e;
                } catch (ExecutionException e) {
                    throw e;
                } catch (TimeoutException e) {
                    LOGGER.warn("Callable: " + callable.toString() + " timed out after " + timeout + " " + timeUnit);
                    throw e;
                } finally {
                    executorService.shutdown();
                }

                return result;
        }
}


没关系,但是现在我也想包装它,这样它在遇到异常时可以重试自身(有延迟):

public class RetryableCallable<T> implements Callable<T> {
    private final long delay;
    private final TimeUnit timeUnit;
    private final Callable<T> callable;

    @Override
    public T call() throws Exception {
        T result = null;

        try {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            result = executorService.submit(this.callable).get();
        } catch (Exception e) {
            ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
            result = executorService.schedule(this.callable, delay, timeUnit).get();
        }

        return result;
    }
}


现在,我的问题是:


是否已经有一个提供此功能的库(或超集)?
这是一个好的设计,尤其是在每个包装器中创建另一个执行程序并提交可调用对象时,为什么?
从设计模式视图和性能视图来看,解决此问题的最佳方法是什么?


谢谢:D

最佳答案

RetryableCallable包裹在InterruptibleCallable周围,将LongRunningTask包裹起来,并在每个RetryableCallable执行上创建两个额外的执行程序是不好的。

RetryableCallable捕获TimeoutException时,通常不应再次运行同一任务。这有点令人困惑,因为如果任务被超时“杀死”了,为什么还要再运行一次?
另外,为什么需要在此处创建另一个执行程序?把事情简单化

public class RetryableCallable<T> implements Callable<T> {
    private int retries = 3;

    @Override
    public T call() throws Exception {
        while (retries-- > 0) {
            try {
                return callable.call();
            } catch (InterruptedException e) {
                LOGGER.error("Callable: " + callable.toString() + " was interrupted");
                throw e;
            } catch (TimeoutException e) {
                LOGGER.warn("Callable: " + callable.toString() + " timed out after " + timeout + " " + timeUnit);
                throw e;
            } catch (Exception e) {
                LOGGER.warn("Callable: " + callable.toString() + " failed");
            }
        }
        throw new IllegalStateException();//or return null
    }
}


而且它不应该使用超时和延迟,它是RetryableCallable,而不是RetryableCallableWithDelayAndTimeoutAndSomethingElse

如果您想限制任务的执行时间,我至少可以看到4种好的方法:


在提交它的线程中调用get(time, timeunit)
call()函数中限制任务的执行时间,例如通过“定期”检查,我们还有更多时间。
使用内部的自定义执行程序和一个线程Auditor来创建自己的执行程序类,该线程将接受扩展TimeLimitedCallable并具有Callable功能的某些int getTimeLimit()。审核员将控制实施TimeLimitedCallable的所有正在运行的任务的时间范围。
为“任务的审计员”创建一个单独的执行者,并在将一个(或一堆)具有业务逻辑的任务提交给主要执行者后-创建并提交审计员任务到单独的执行者中。

09-06 19:28