Future通过get方法来获取异步任务的结果,如果任务还没有完成则阻塞线程,因为我们需要它的结果,所以等待是应该的。

如果需要处理一批这类任务,提交到线程池我们最终会得到多个Future,但是每个任务执行的时间可能并不相同,那么我们应该优先调用哪个Future的get呢?示例如下图:

Java异步任务优化CompletionService-LMLPHP

可以看到有些任务因为执行时间较长,而在他后面的任务可能执行任务的时间较短,已经提前完成了,但是并不能得到及时的处理,只能等到前面的任务执行完成后才能处理,这样设计是非常不合理的。

CompletionService解决

线程Java中提供了CompletionService来解决这个问题的,同样可以把一批任务提交到CompletionService,CompletionService可以把先执行完成的任务通过take方法获取到。使用方法如下图:

Java异步任务优化CompletionService-LMLPHP

ExecutorCompletionService是CompletionService的具体实现,我们把线程池设置给ExecutorCompletionService后,也可以通过submit提交任务到ExecutorCompletionService,最后通过take方法获取到已经完成的Future

从上面例子可以看到因为id等于3的任务先执行完成,所以也先处理了这个任务。最先提交的任务id等于0的因为休眠时间较长,所以先完成的任务就可以先执行处理

ExecutorCompletionService源码

从上面的例子可以看到使用ExecutorCompletionService有三个关键步骤:设置一个线程池、submit提交任务、take获取完成的Future

看源码首先看他的属性,查看源码得到他有两个关键属性:

Executor executor:执行线程的线程池;

BlockingQueue<Future> completionQueue:阻塞队列,保存完成的Future;

看到这两个属性就大概能够猜到它的实现方案:利用线程池去执行任务,利用阻塞队列来保存完成的Future

在submit方法肯定调用的是线程池的submit方法这个很简单,唯一的问题是如何把完成的Future放到阻塞队列中的?

上一篇文章中我们知道最终实现有返回的异步任务的类是FutureTask,最终运行的是FutureTask的run方法,run方法完成后会调用finishCompletion方法去唤醒那些因为调用FutureTask.get()方法而阻塞的线程,在finishCompletion的最后调用了done()方法,只不过在FutureTask中done()方法是一个空方法

而ExecutorCompletionService就是利用FutureTask的done()实现了把任务放到阻塞队列中。

ExecutorCompletionService有一个私有的内部类QueueingFuture,它继承FutureTask,并实现了done()方法,done()方法把任务放到了ExecutorCompletionService的阻塞队列中(所以QueueingFuture是私有类)。done()方法只有在任务执行完成后才会调用。

图解ExecutorCompletionService功能

通过以上分析基本上弄懂了ExecutorCompletionService的执行流程,分析如下图:

Java异步任务优化CompletionService-LMLPHP

可以看到整个源码实际上就只有三步:

submit方法把任务封装FutureTask并作为QueueingFuture的属性保存,然后提交QueueingFuture到线程池;

线程池执行任务调用的是QueueingFuture的run方法,run方法最后调用done方法把属性FutureTask(此时任务已经完成)添加到阻塞队列;

take方法从阻塞队列中获取FutureTask;

总结

ExecutorCompletionService实际上还是比较简单的一个类,提交任务利用的是线程池的提交,而他自己只创建了一个FutureTask的子类用来实现done方法,在任务完成后把FutureTask添加到阻塞队列中。take方法调用的是阻塞队列的take获取FutureTask。

Java程序员日常学习笔记,如理解有误欢迎各位交流讨论!

04-01 05:21