同步异步
计算机技术发展迅猛,不管是在软件还是硬件方面都发展的非常快,电脑的CPU也在更新换代,强劲的CPU可以承担更多的任务。如果程序一直使用同步编程的话,那么将会浪费CPU资源。举个列子,一个CPU有10个通道,如果所有程序都走一个通道,那么剩余9个通道都是空闲的,那这9个通道都浪费掉了。
如果使用异步编程,那么其它9个通道都可以利用起来了,程序的吞吐量也上来了。也就是说要充分利用CPU资源,使其忙碌起来,而异步编程无疑是让其忙碌的一种方式。
CompletableFuture
在CompletableFuture出来之前,我们可以用Future接口进行异步编程,Future配合线程池一起工作,它把任务交给线程池,线程池中处理完毕后通过Future.get()方法来获取结果,Future.get()可以理解为一个回调操作,在回调之前我们还可以做其他事情。
下面一个例子用来模拟小明借图书场景:
- 小明去图书馆借书
- 图书管理员找书(异步操作)
- 小明边玩手机边等待
- 小明拿到书
public class FutureTest extends TestCase {
// 申明一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
5,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public void testBook() {
String bookName = "《飘》";
System.out.println("小明去图书馆借书");
Future<String> future = threadPoolExecutor.submit(() -> {
// 模拟图书管理员找书花费时间
long minutes = (long) (Math.random() * 10) + 1;
System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书" + bookName);
Thread.sleep((long) (Math.random() * 2000));
return bookName;
});
// 等待过程中做其他事情
this.playPhone();
try {
String book = future.get();
System.out.println("小明拿到了图书" + book);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private void playPhone() {
System.out.println("小明在玩手机等待图书");
}
}
这是一个典型的Future使用方式,其中future.get()
方法是阻塞的,程序运行会停留在这一行,我们的程序不可能一直等待下去,这个时候可以用future.get(long timeout, TimeUnit unit)
方法,给定一个等待时间,如果超过等待时间还是没有拿到数据则抛出一个TimeoutException异常,我们可以catch这个异常,然后对异常情况做出处理。
现在假设小明最多等待2分钟,那么代码可以这么写:
String book = future.get(2, TimeUnit.MINUTES);
现在有这么一种情况,假设图书管理员找到书本之后,还需要交给助理,让助理录入图书信息,录完信息才把书交给小明。助理录入的过程也是异步的,也就是说,我们要实现多个异步进行流水线这样的功能。
可以发现Future用来处理多异步流水线非常困难,这个时候CompletableFuture就派上用场了,CompletableFuture自带流水线特性,就好比Collection对应的Stream。
下面来看下CompletableFuture的基本用法,我们将上面的例子使用CompletableFuture实现:
public class CompletableFutureTest extends TestCase {
public void testBook() {
String bookName = "《飘》";
System.out.println("小明去图书馆借书");
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
// 模拟图书管理员找书花费时间
long minutes = (long) (Math.random() * 10) + 1;
System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书" + bookName);
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
future.complete(bookName);
}).start();
// 等待过程中做其他事情
this.playPhone();
try {
String book = future.get();
System.out.println("小明拿到了图书" + book);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private void playPhone() {
System.out.println("小明在玩手机等待图书");
}
}
其中future.complete(bookName);
的意思是将结果返回,然后调用future.get()的地方就能获取到数据。
CompletableFuture还提供了一个静态方法CompletableFuture.supplyAsync(Supplier)
用来快速创建任务,参数Supplier用来返回任务结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟图书管理员找书花费时间
long minutes = (long) (Math.random() * 10) + 1;
System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书《飘》");
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "《飘》";
});
如果不需要返回结果可以使用CompletableFuture.runAsync(Runnable)
方法:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("running"))
接下来,我们使用CompletableFuture完成上面说到的需求:图书管理员找到书本之后,还需要交给助理,让助理录入图书信息
我们需要用到CompletableFuture.thenCompose(Function)
方法,用法如下
public void testBook3() {
String bookName = "《飘》";
System.out.println("小明去图书馆借书");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟图书管理员找书花费时间
long minutes = (long) (Math.random() * 10) + 1;
System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书"+bookName);
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return bookName;
})
// thenCompose,加入第二个异步任务
.thenCompose((book/*这里的参数是第一个异步返回结果*/) -> CompletableFuture.supplyAsync(()-> {
System.out.println("助理录入图书信息");
return book;
}));
// 等待过程中做其他事情
this.playPhone();
try {
String book = future.get();
System.out.println("小明拿到了图书" + book);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
thenApply(),thenAccept(),thenCompose(),thenCombine()
thenApply(Function)
的意思是对CompletableFuture返回结果做进一步处理,然后返回一个新的结果,它的参数使用的是Function,意味着可以使用lambda表达式,表达式会提供一个参数,然后需要一个返回结果。
public void testThenApply() throws ExecutionException, InterruptedException {
int i = 0;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> i + 1)
// 将相加后的结果转成字符串
// v就是上面i+1后的结果
// 等同于:.thenApply((v) -> String.valueOf(v))
.thenApply(String::valueOf);
String str = future.get();
System.out.println("String value: " + str);
}
如果不需要返回结果,可以用thenAccept(Consumer<? super T> action)
thenApply(),thenAccept()的区别是,thenApply提供参数,需要返回值,thenAccept只提供参数不需要返回值。
thenCompose()
的用法如下:
completableFuture1.thenCompose((completableFuture1_result) -> completableFuture2)
这段代码的意思是将completableFuture1中返回的结果带入到completableFuture2中去执行,然后返回completableFuture2中的结果,下面是一个简单的实例:
public void testThenCompose() throws ExecutionException, InterruptedException {
int i=0;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> i + 1)
.thenCompose((j) -> CompletableFuture.supplyAsync(() -> j + 2));
Integer result = future.get();
}
打印:
result:3
thenCombine()
方法是将两个CompletableFuture任务结果组合起来
public void testThenCombine() throws ExecutionException, InterruptedException {
int i=0;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> i + 1)
.thenCombine(CompletableFuture.supplyAsync(() -> i + 2), (result1, result2) -> {
System.out.println("第一个CompletableFuture结果:" + result1);
System.out.println("第二个CompletableFuture结果:" + result2);
return result1 + result2;
});
Integer total = future.get();
System.out.println("总和:" + total);
}
打印:
第一个CompletableFuture结果:1
第二个CompletableFuture结果:2
总和:3
CompletableFuture.join()
假设有一组CompletableFuture对象,现在需要这些CompletableFuture任务全部执行完毕,然后再接着做某些事情。针对这个需求,我们可以使用CompletableFuture.join()
方法。
public void testJoin() {
List<CompletableFuture> futures = new ArrayList<>();
System.out.println("100米跑步比赛开始");
for (int i = 0; i < 10; i++) {
final int num = i + 1;
futures.add(CompletableFuture.runAsync(() -> {
int v = (int)(Math.random() * 10);
try {
Thread.sleep(v);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(num + "号选手到达终点,用时:" + (10 + v) + "秒");
}));
}
CompletableFuture<Double>[] futureArr = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture.allOf(futureArr).join();
System.out.printf("所有选手到达终点");
}
打印:
100米跑步比赛开始
3号选手到达终点,用时:16秒
1号选手到达终点,用时:15秒
2号选手到达终点,用时:11秒
5号选手到达终点,用时:13秒
4号选手到达终点,用时:15秒
8号选手到达终点,用时:10秒
6号选手到达终点,用时:18秒
10号选手到达终点,用时:12秒
7号选手到达终点,用时:19秒
9号选手到达终点,用时:18秒
所有选手到达终点
whenComplete()
如果需要在任务处理完毕后做一些处理,可以使用whenComplete(BiConsumer)
或whenCompleteAsync(BiConsumer)
String bookName = "《飘》";
CompletableFuture<String> future = CompletableFuture.
supplyAsync(() -> {System.out.println("图书管理员开始找书");return bookName;})
.thenApply((book) -> {System.out.println("找到书本,助理开始录入信息"); return book;})
.whenCompleteAsync(((book, throwable) -> {
System.out.println("助理录入信息完毕,通知小明来拿书");
}));
String book = future.get();
System.out.println("小明拿到书" + book);
打印:
图书管理员开始找书
找到书本,助理开始录入信息
助理录入信息完毕,通知小明来拿书
小明拿到书《飘》
异常处理
对异常的处理通常分为两步,第一步抛出异常,第二步捕获异常,首先我们来看下CompletableFuture如何抛出异常。
CompletableFuture抛出异常有两种方式,第一种方式,如果CompletableFuture是直接new出来的对象,必须使用future.completeExceptionally(e)
抛出异常,如果采用 throw new RuntimeException(e);
方式抛出异常,调用者是捕获不到的。
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(()->{
String value = "http://";
try {
// 模拟出错,给一个不存在的ENCODE
value = URLEncoder.encode(value, "UTF-888");
future.complete(value);
} catch (UnsupportedEncodingException e) {
// future处理异常
future.completeExceptionally(e);
// !!此方式调用者无法捕获异常
// throw new RuntimeException(e);
}
}).start();
第二中方式,如果CompletableFuture对象是由工厂方法(如CompletableFuture.supplyAsync()
)创建的,可以直接throw new RuntimeException(e)
,因为supplyAsync()封装的方法内部做了try...catch处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
String value = "http://";
try {
// 模拟出错,给一个不存在的ENCODE
value = URLEncoder.encode(value, "UTF-88");
return value;
} catch (UnsupportedEncodingException e) {
// 这样不行,可以throw
//future.completeExceptionally(e);
throw new RuntimeException(e);
}
});
接下来我们来看下如何捕获异常,捕获异常也分为两种。
第一种,在catch中捕获:
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
// 捕获异常1,首先会到这里来
System.out.println("捕获异常1,msg:" + e.getMessage());
}
第二种,在future.whenCompleteAsync()或future.whenComplete()方法中捕获:
future.whenCompleteAsync((value, e)->{
if (e != null) {
// 捕获异常2,这里也会打印
System.out.println("捕获异常2, msg:" + e.getMessage());
} else {
System.out.println("返回结果:" + value);
}
});
小结
CompletableFuture的出现弥补了Future接口在某些地方的不足,比如事件监听,多任务合并,流水线操作等。同时CompletableFuture配合lambda表达式让开发者使用起来更加方面,使得开发者在异步编程上多了一种选择。