我有一个线程池执行程序,它对批量传入的键列表执行相同的操作。所以我使用 invokeall() 方法对批处理中的键列表进行处理。用例是这样的,如果批处理中的任何任务返回错误,则没有必要继续处理其他键。所以

  • 一旦任务重新调整错误,我如何取消批处理执行的任务。
  • 但不影响另一批 key 的执行。即取消应每批次隔离。

  • 谢谢你的帮助。

    最佳答案

    如果没有一点定制,我不知道如何做到这一点。我能想到的最简单的实现需要:

  • 一个专门的 Future 实现,基本上是 FutureTask 的子类,它覆盖了 setException() 方法,以便在任务抛出异常时取消所有其他任务
  • 一个专门的 ThreadPoolExecutor 实现,它覆盖 invokeAll() 以利用自定义的 future

  • 它是这样的:

    对于自定义 future :
    import java.util.Collection;
    import java.util.concurrent.*;
    
    public class MyFutureTask<V> extends FutureTask<V> {
      private Callable<V> task;
      private Collection<Future<V>> allFutures;
    
      public MyFutureTask(Callable<V> task, Collection<Future<V>> allFutures) {
        super(task);
        this.task = task;
        this.allFutures = allFutures;
      }
    
      @Override
      protected void setException(Throwable t) {
        super.setException(t);
        synchronized(allFutures) {
          for (Future<V> future: allFutures) {
            if ((future != this) && !future.isDone()) {
              future.cancel(true);
            }
          }
        }
      }
    }
    

    对于自定义线程池:
    import java.util.*;
    import java.util.concurrent.*;
    
    public class MyThreadPool extends ThreadPoolExecutor {
      public MyThreadPool(int size) {
        super(size, size, 1L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());
      }
    
      @Override
      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        List<Future<T>> futures = new ArrayList<>(tasks.size());
        for (Callable<T> callable: tasks) {
          futures.add(new MyFutureTask<>(callable, futures));
        }
        for (Future<T> future: futures) {
          execute((MyFutureTask<T>) future);
        }
        for (Future<T> future: futures) {
          try {
            future.get();
          } catch (ExecutionException|CancellationException e) {
            // ignore this exception
          }
        }
        return futures;
      }
    }
    

    代码示例来测试它:
    import java.util.*;
    import java.util.concurrent.*;
    
    public class TestThreadPool {
      public static void main(final String[] args) {
        ExecutorService executor = null;
        try {
          int size = 10;
          executor = new MyThreadPool(size);
          List<Callable<String>> tasks = new ArrayList<>();
          int count=1;
          tasks.add(new MyCallable(count++, false));
          tasks.add(new MyCallable(count++, true));
          List<Future<String>> futures = executor.invokeAll(tasks);
          System.out.println("results:");
          for (int i=0; i<futures.size(); i++) {
            Future<String> f = futures.get(i);
            try {
              System.out.println(f.get());
            } catch (CancellationException e) {
              System.out.println("CancellationException for task " + (i+1) +
                ": " + e.getMessage());
            } catch (ExecutionException e) {
              System.out.println("ExecutionException for task " + (i+1) +
                ": " + e.getMessage());
            }
          }
        } catch(Exception e) {
          e.printStackTrace();
        } finally {
          if (executor != null) executor.shutdownNow();
        }
      }
    
      public static class MyCallable implements Callable<String> {
        private final int index;
        private final boolean simulateFailure;
    
        public MyCallable(int index, boolean simulateFailure) {
          this.index = index;
          this.simulateFailure = simulateFailure;
        }
    
        @Override
        public String call() throws Exception {
          if (simulateFailure) {
            throw new Exception("task " + index + " simulated failure");
          }
          Thread.sleep(2000L);
          return "task " + index + " succesful";
        }
      }
    }
    

    最后是执行测试的结果,如输出控制台中所示:
    results:
    CancellationException for task 1: null
    ExecutionException for task 2: java.lang.Exception: task 2 simulated failure
    

    关于java - ThreadPoolExecutor:当一个任务返回错误时从 invokeAll() 取消任务,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26185013/

    10-12 01:44
    查看更多