简介
构建一个新的现成是有一定代价的,因为涉及和操作系统的交互。如果程序中创建了大量的生命期很短的现成,应该使用线程池(Thread Pool)。一个线程池中包含许多准备运行的空闲线程。将Runnable对象交给线程池,就会有一个线程调用run方法。
当run方法退出时,线程不会死亡,而在池中准备为下一个请求提供服务。
执行器有很多静态共产方法用来创建线程池。例如newCachedThreadPool、newFixedThreadPool、newSingleThraedExecutor、newScheledTheadPool、newSingleThreadScheduleExecutor.
1、线程池
ExecutorService接口。接下来的一个例子是用途查看某个目录下的所有文件是否包含指定的关键字。并统计满足这些条件的文件的数量。每遇到一个文件夹就提交一个任务到线程池。
package ch14;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class MatchCounter implements Callable<Integer> {
private File fileDirectory;
private String keyWord;
// Thread pool
private ExecutorService pool;
private int count;
public MatchCounter(File fileDirectory, String keyWord, ExecutorService pool) {
this.fileDirectory = fileDirectory;
this.keyWord = keyWord;
this.pool = pool;
}
@Override
public Integer call() throws Exception {
count = 0;
try{
File[] files = fileDirectory.listFiles();
List<Future<Integer>> results = new ArrayList<>();
for (File file : files) {
// 如果是文件夹,继续调用
if(file.isDirectory()){
MatchCounter matchCounter = new MatchCounter(file, keyWord, pool);
// 提交任务
Future<Integer> result = pool.submit(matchCounter);
results.add(result);
}else{
// search this file content is has keyword. then count ++.
if(search(file)){
count ++;
}
}
}
// 统计结果
for (Future<Integer> result : results) {
try{
count += result.get();
}catch (Exception e){
System.out.println(e.getMessage());
}
}
}catch (Exception e){
}
// 返回计算结果
return count;
}
// 搜索文件中是否包含关键字
public boolean search(File file){
try(Scanner in = new Scanner(file,"UTF-8")){
boolean founded = false;
while(!founded && in.hasNextLine()){
String line = in.nextLine();
if(line.contains(keyWord)){
founded = true;
}
}
return founded;
}catch (Exception e){
System.out.println(e.getMessage());
return false;
}
}
}
测试类
package ch14;
import java.io.File;
import java.util.Scanner;
import java.util.concurrent.*;
public class TheadPoolTest {
public static void main(String[] args) {
try (Scanner scanner = new Scanner(System.in)) {
System.out.println("输入查找目录:");
String dir = scanner.nextLine();
System.out.println("输入查找关键字:");
String keyword = scanner.nextLine();
// 申请线程池
ExecutorService pool = Executors.newCachedThreadPool();
// 创建对象
MatchCounter matchCounter = new MatchCounter(new File(dir), keyword, pool);
// 获取结果
Future<Integer> result = pool.submit(matchCounter);
try {
System.out.println(result.get() + " files matching the keywords: " + keyword);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 关闭线程池
pool.shutdown();
// 获取程序运行过程中线程池的最大线程数
int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize();
System.out.println("Largest pool size = " + largestPoolSize);
/**
* 输入查找目录:
* C:\Users\Administrator\Desktop\test
* 输入查找关键字:
* xx
*
* 3 files matching the keywords: xx
* Largest pool size = 2
*/
}
}
}
ExecutorService new CachedThreadPool 返回一个带缓存的线程池,该线程池在必要的时候创建线程,在线程空闲60秒之后终止线程;
ExecutorService newFixedThreadPool(int threads) 返回一个线程池,该池子中的线程数由参数指定;
ExecutorService newSginleThreadExecutor() 返回一个执行器,他在一个单个的线程中一次执行各个任务。
Future<T> submit(Callable<T> task)
Future<T> submit(Runnable task, T result)
Future<T> submit(Runnable task)
提交指定的任务去执行,注意第一个方法可以指定返回结果,第二个方法可以执行完成任务时得到result。
void shutdown() 关闭服务,会先完成已经提交的任务而不再接受新的任务
int getLargestPoolSize() 返回线程池在该执行器生命周期中的最大尺寸。换句话说,就是获得该线程池创建一个历史线程数的最大值。
2、预定执行
ScheduledExecutorService接口具有为预定执行或重复执行任务而设计的方法。他是一种允许使用线程池机制的java.util.Timer的泛化。
Executors类的newSingleThreadExecutor
3、控制任务组
4、Fork Join框架
有些应用使用了大量的线程,但是很多都是空闲的。例如,一个web服务器可能会为每隔连接分别使用一个线程。另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集型任务,如图像处理或者视频处理。JAVA SE 7中新引入了fork-join框架,专门用来支持后一类应用。假设有一个处理任务,他可以很自然的分解为子任务,如下所示
if(问题规格 < 设定的阈值){
直接处理任务
}else{
分解该任务
递归的解决每一个子任务
组合结果
}
work stealing(工作觅取).
如下的实例用于在大小为10000的数组中找出大于0.5的数字的个数。
package ch14;
import java.util.concurrent.RecursiveTask;
import java.util.function.DoublePredicate;
public class ForkCounter extends RecursiveTask<Integer> {
public static final int THRESHOLD = 1000;
private double[] values;
private int from;
private int to;
private DoublePredicate filter;
public ForkCounter(double[] values, int from, int to, DoublePredicate filter) {
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}
@Override
protected Integer compute() {
if(to - from < THRESHOLD){
int count = 0;
for (int i = from; i < to; i++) {
if(filter.test(values[i])){
count ++;
}
}
return count;
}else{
// 分解任务为2段
int mid = (from + to) / 2;
ForkCounter first = new ForkCounter(values,from,mid,filter);
ForkCounter second = new ForkCounter(values,mid,to,filter);
// 执行两个任务,返回所有任务的结果(Any则是其中一个就行)
invokeAll(first,second);
// 返回结果
return first.join() + second.join();
}
}
}
测试程序
package ch14;
import java.util.concurrent.ForkJoinPool;
public class ForkCounterTest {
public static void main(String[] args) {
final int SIZE = 10000;
double[] numbers = new double[SIZE];
for (int i = 0; i < SIZE; i++) {
numbers[i] = Math.random();
}
// 找出大于0.5的数
ForkCounter counter = new ForkCounter(numbers,0,numbers.length,x -> x>0.5);
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.invoke(counter);
System.out.println(counter.join());
// 5026
}
}
5、可完成Future
thenApply
thenCompose
handle
theAccept
whenComplete
thenRun