线程池相关
源码:
package java.util.concurrent;
import java.util.*;
public abstract class AbstractExecutorService implements ExecutorService {
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
}
类 AbstractExecutorService
已实现的接口:
已知子类:
此类使用 newTaskFor 返回的 RunnableFuture
实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture
是此包中提供的 FutureTask
类。例如,submit(Runnable) 的实现创建了一个关联 RunnableFuture 类,该类将被执行并返回。子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的 RunnableFuture 实现。
扩展示例。以下是一个类的简要介绍,该类定制 ThreadPoolExecutor
使用 CustomTask 类替代默认 FutureTask:
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
static class CustomTask<V> implements RunnableFuture<V> {...}
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
return new CustomTask<V>(c);
}
protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
return new CustomTask<V>(r, v);
}
// ... add constructors, etc.
}
方法摘要
(Collection<? extends Callable<T>> tasks)
执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
(Collection<? extends Callable<T>> tasks)
执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
protected
(Callable<T> callable)
为给定可调用任务返回一个 RunnableFuture。
protected
newTaskFor(Runnable runnable, T value)
为给定可运行任务和默认值返回一个 RunnableFuture。
(Callable<T> task)
提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
Future<?>
(Runnable task)
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
submit(Runnable task, T result)
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
从类 java.lang.Object 继承的方法
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
从接口 java.util.concurrent.ExecutorService 继承的方法
awaitTermination, isShutdown, isTerminated, shutdown, shutdownNow
从接口 java.util.concurrent.Executor 继承的方法
newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable,T value)
为给定可运行任务和默认值返回一个 RunnableFuture。
参数:
runnable
- 将被包装的可运行任务
value
- 用于所返回的将来任务的默认值
返回:
一个 RunnableFuture,在运行的时候,它将运行底层可运行任务,作为 Future 任务,它将生成给定值作为其结果,并为底层任务提供取消操作。
newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
为给定可调用任务返回一个 RunnableFuture。
参数:
callable
- 将包装的可调用任务
返回:
一个 RunnableFuture,在运行的时候,它将调用底层可调用任务,作为 Future 任务,它将生成可调用的结果作为其结果,并为底层任务提供取消操作。
submit
public Future<?> submit(Runnable task)
从接口 ExecutorService
复制的描述
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在 成功 完成时将会返回 null。
指定者:
接口 ExecutorService
中的 submit
参数:
task
- 要提交的任务
返回:
表示任务等待完成的 Future
submit
public <T> Future<T> submit(Runnable task, T result)
从接口 ExecutorService
复制的描述
提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
指定者:
接口 ExecutorService
中的 submit
参数:
task
- 要提交的任务
result
- 返回的结果
返回:
表示任务等待完成的 Future
submit
public <T> Future<T> submit(Callable<T> task)
从接口 ExecutorService
复制的描述
提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
如果想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get(); 形式的构造。
注:Executors
类包括了一组方法,可以转换某些其他常见的类似于闭包的对象,例如,将 PrivilegedAction
转换为 Callable
形式,这样就可以提交它们了。
指定者:
接口 ExecutorService
中的 submit
参数:
task
- 要提交的任务
返回:
表示任务等待完成的 Future
invokeAny
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
从接口 ExecutorService
复制的描述
执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
指定者:
接口 ExecutorService
中的 invokeAny
参数:
tasks
- 任务 collection
返回:
某个任务返回的结果
抛出:
InterruptedException
- 如果等待时发生中断
ExecutionException
- 如果没有任务成功完成
invokeAny
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
从接口 ExecutorService
复制的描述
执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
指定者:
接口 ExecutorService
中的 invokeAny
参数:
tasks
- 任务 collection
timeout
- 最长等待时间
unit
- timeout 参数的时间单位
返回:
某个任务返回的结果
抛出:
InterruptedException
- 如果等待时发生中断
ExecutionException
- 如果没有任务成功完成
TimeoutException
- 如果在所有任务成功完成之前给定的超时期满
invokeAll
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
从接口 ExecutorService
复制的描述
执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone()
为 true。注意,可以正常地或通过抛出异常来终止 已完成 任务。如果正在进行此操作时修改了给定的 collection,则此方法的结果是不确定的。
指定者:
接口 ExecutorService
中的 invokeAll
参数:
tasks
- 任务 collection
返回:
表示任务的 Future 列表,列表顺序与给定任务列表的迭代器所生成的顺序相同,每个任务都已完成。
抛出:
InterruptedException
- 如果等待时发生中断,在这种情况下取消尚未完成的任务。
invokeAll
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
从接口 ExecutorService
复制的描述
执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone()
为 true。一旦返回后,即取消尚未完成的任务。注意,可以正常地或通过抛出异常来终止 已完成 任务。如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
指定者:
接口 ExecutorService
中的 invokeAll
参数:
tasks
- 任务 collection
timeout
- 最长等待时间
unit
- timeout 参数的时间单位
返回:
表示任务的 Future 列表,列表顺序与给定任务列表的迭代器所生成的顺序相同。如果操作未超时,则已完成所有任务。如果确实超时了,则某些任务尚未完成。
抛出:
InterruptedException
- 如果等待时发生中断,在这种情况下取消尚未完成的任务