Java高并发 -- J.U.C.组件扩展
FutureTask
Future模式,核心思想是异步调用。和同步调用的区别在于:如果某个任务A非常耗时,异步调用下,被调者可以立即返回,然后着手处理其他任务,不用在这个任务A上等待。等到真正需要任务A的结果了再尝试去获取。
Future模式,有点类似淘宝购物,假如买一部手机,从付款到收到货可能需要三天,这三天不需要傻傻等待,因为付款后会有一个订单,而这个订单是我买了这件东西、可以凭这个单号取东西的一个凭证。所以你一旦得到了订单,完全可以放下这件事取忙别的,到时候去拿快递就行了。Future模式的异步调用中,立即返回的一个值就类似于这里说的订单了(而不是你买的手机),然后就可以着手处理其他任务了,这就充分利用了等待时间。等其他任务处理完了,再根据拿到的这个类似订单的值,取到真实的数据(手机)。
FutureTask实现了RunnaleFuture接口,而RunnaleFuture实现了Runnale和Future接口。下面先以Future + Callable为例
package com.shy.concurrency.aqs;
import java.util.concurrent.*;
/**
* @author Haiyu
* @date 2019/1/4 10:28
*/
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> callable = () -> {
System.out.println("执行任务1");
Thread.sleep(2000);
return "Done!";
};
ExecutorService executorService = Executors.newSingleThreadExecutor();
long start = System.currentTimeMillis();
Future<String> future = executorService.submit(callable);
System.out.println("执行任务2");
// get方法会阻塞,直到计算完成才返回
Thread.sleep(2000);
String result = future.get();
long end = System.currentTimeMillis();
System.out.println(result);
System.out.println((end - start) / 1000);
executorService.shutdown();
}
}
Callable看成任务1,主线程中的任务看成任务2。可以看到Callable任务需要执行2秒,主线程中也需要耗时2秒。在submit后立即返回得到Future对象,让其执行同时主线程也可以执行自己的任务。两个任务同时执行,所以只花费了2s就执行完了两个任务。
下面再使用FutureTask,只需要稍微改动上面的程序即可。
package com.shy.concurrency.aqs;
import java.util.concurrent.*;
/**
* @author Haiyu
* @date 2019/1/4 10:28
*/
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> callable = () -> {
System.out.println("执行任务1");
Thread.sleep(2000);
return "Done!";
};
ExecutorService executorService = Executors.newSingleThreadExecutor();
long start = System.currentTimeMillis();
FutureTask<String> future = new FutureTask<>(callable);
executorService.submit(future);
System.out.println("执行任务2");
// get方法会阻塞,直到计算完成才返回
Thread.sleep(2000);
String result = future.get();
long end = System.currentTimeMillis();
System.out.println(result);
System.out.println((end - start) / 1000);
executorService.shutdown();
}
}
Fork/Join
fork也就是分支、分叉的意思,可以将大任务分解成小任务;join表示等待的意思,必须等待fork后的小任务执行完毕,得到执行后的部分结果,才能将部分结果合并成最终结果。
比如计算1到100的和,规定当区间小于10时可以直接计算,否则就分成左右两个分支,如此递归地将[1, 100]区间划分成若干个小区间;每个小区间计算部分和,等待这些部分和的结果都计算完毕,最后将其全部合并,得到最终的结果。
通常一个物理线程需要处理多个逻辑任务,所以每一个线程都有一个任务队列。若线程A的任务都执行完了,B还有很多任务没执行,此时A会“帮助”B执行它的任务,A帮助B执行B的任务时,从队列的尾部拿数据;而B自己执行任务时从队列头部拿数据,这就像是两个指针一个往左移动一个往右移动,避免了A、B之间对数据的竞争。
JDK中有ForkJoinPool,该接口有个方法public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
ForkJoinTask支持fork()
和join()
方法,它有两个重要的子类,没有返回值的RecursiveAction和有返回值的RecursiveTask,它们都有个方法compute()
,在这个方法中进行主要的计算。对于RecursiveAction来说签名是void,而对于RecursiveTask来说有返回值所以签名是<T>
下面以计算1~100的和为例
package com.shy.concurrency.aqs;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* @author Haiyu
* @date 2019/1/4 10:49
*/
public class ForkJoinExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(new CountTask(1, 100));
int res = forkJoinTask.get();
System.out.println(res);
}
}
class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
boolean canCompute = end - start < THRESHOLD;
if (canCompute) {
int sum = 0;
for (int i = start; i <= end ; i++) {
sum += i;
}
return sum;
} else {
int mid = (end - start) / 2 + start;
CountTask left = new CountTask(start, mid);
CountTask right = new CountTask(mid + 1, end);
left.fork();
right.fork();
int leftRes = left.join();
int rightRes = right.join();
return leftRes + rightRes;
}
}
}