我想使用CompletionService来处理一系列线程完成时的结果。我有一个循环服务,以获取它提供的Future对象,但它们不可用,但是我不知道确定所有线程何时完成(从而退出循环)的最佳方法:

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class Bar {

    final static int MAX_THREADS = 4;
    final static int TOTAL_THREADS = 20;

    public static void main(String[] args) throws Exception{

        final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
        final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);

        for (int i=0; i<TOTAL_THREADS; i++){
            service.submit(new MyCallable(i));
        }

        int finished = 0;
        Future<Integer> future = null;
        do{
            future = service.take();
            int result = future.get();
            System.out.println("  took: " + result);
            finished++;

        }while(finished < TOTAL_THREADS);

        System.out.println("Shutting down");
        threadPool.shutdown();
    }


    public static class MyCallable implements Callable<Integer>{

        final int id;

        public MyCallable(int id){
            this.id = id;
            System.out.println("Submitting: " + id);
        }

        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println("finished: " + id);
            return id;
        }
    }
}

我已经尝试检查ThreadPoolExecutor的状态,但是我知道getCompletedTaskCount和getTaskCount方法仅是近似值,不应被依赖。有没有比我自己计算它们更好的方法来确保我从CompletionService中检索了所有期货?

编辑:Nobeh提供的链接和this link都建议先计算提交的任务数,然后多次调用take()。我只是感到惊讶,没有办法询问CompletionService或其执行程序还剩下什么要返回。

最佳答案

回答这些问题可以给您答案?

  • 您的异步任务是否创建提交给CompletionService的其他任务?
  • service是唯一应该处理您的应用程序中创建的任务的对象吗?

  • 基于reference documentationCompletionService采取消费者/生产者方法,并利用内部Executor。因此,只要您在一个位置生成任务并在另一位置使用它们,CompletionService.take()将表示是否还有其他结果要发出。

    我相信this question也可以为您提供帮助。

    08-06 16:56