我有一个线程池执行程序,它对批量传入的键列表执行相同的操作。所以我使用 invokeall() 方法对批处理中的键列表进行处理。用例是这样的,如果批处理中的任何任务返回错误,则没有必要继续处理其他键。所以
谢谢你的帮助。
最佳答案
如果没有一点定制,我不知道如何做到这一点。我能想到的最简单的实现需要:
它是这样的:
对于自定义 future :
import java.util.Collection;
import java.util.concurrent.*;
public class MyFutureTask<V> extends FutureTask<V> {
private Callable<V> task;
private Collection<Future<V>> allFutures;
public MyFutureTask(Callable<V> task, Collection<Future<V>> allFutures) {
super(task);
this.task = task;
this.allFutures = allFutures;
}
@Override
protected void setException(Throwable t) {
super.setException(t);
synchronized(allFutures) {
for (Future<V> future: allFutures) {
if ((future != this) && !future.isDone()) {
future.cancel(true);
}
}
}
}
}
对于自定义线程池:
import java.util.*;
import java.util.concurrent.*;
public class MyThreadPool extends ThreadPoolExecutor {
public MyThreadPool(int size) {
super(size, size, 1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
List<Future<T>> futures = new ArrayList<>(tasks.size());
for (Callable<T> callable: tasks) {
futures.add(new MyFutureTask<>(callable, futures));
}
for (Future<T> future: futures) {
execute((MyFutureTask<T>) future);
}
for (Future<T> future: futures) {
try {
future.get();
} catch (ExecutionException|CancellationException e) {
// ignore this exception
}
}
return futures;
}
}
代码示例来测试它:
import java.util.*;
import java.util.concurrent.*;
public class TestThreadPool {
public static void main(final String[] args) {
ExecutorService executor = null;
try {
int size = 10;
executor = new MyThreadPool(size);
List<Callable<String>> tasks = new ArrayList<>();
int count=1;
tasks.add(new MyCallable(count++, false));
tasks.add(new MyCallable(count++, true));
List<Future<String>> futures = executor.invokeAll(tasks);
System.out.println("results:");
for (int i=0; i<futures.size(); i++) {
Future<String> f = futures.get(i);
try {
System.out.println(f.get());
} catch (CancellationException e) {
System.out.println("CancellationException for task " + (i+1) +
": " + e.getMessage());
} catch (ExecutionException e) {
System.out.println("ExecutionException for task " + (i+1) +
": " + e.getMessage());
}
}
} catch(Exception e) {
e.printStackTrace();
} finally {
if (executor != null) executor.shutdownNow();
}
}
public static class MyCallable implements Callable<String> {
private final int index;
private final boolean simulateFailure;
public MyCallable(int index, boolean simulateFailure) {
this.index = index;
this.simulateFailure = simulateFailure;
}
@Override
public String call() throws Exception {
if (simulateFailure) {
throw new Exception("task " + index + " simulated failure");
}
Thread.sleep(2000L);
return "task " + index + " succesful";
}
}
}
最后是执行测试的结果,如输出控制台中所示:
results:
CancellationException for task 1: null
ExecutionException for task 2: java.lang.Exception: task 2 simulated failure
关于java - ThreadPoolExecutor:当一个任务返回错误时从 invokeAll() 取消任务,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26185013/