同步异步

计算机技术发展迅猛,不管是在软件还是硬件方面都发展的非常快,电脑的CPU也在更新换代,强劲的CPU可以承担更多的任务。如果程序一直使用同步编程的话,那么将会浪费CPU资源。举个列子,一个CPU有10个通道,如果所有程序都走一个通道,那么剩余9个通道都是空闲的,那这9个通道都浪费掉了。

如果使用异步编程,那么其它9个通道都可以利用起来了,程序的吞吐量也上来了。也就是说要充分利用CPU资源,使其忙碌起来,而异步编程无疑是让其忙碌的一种方式。

CompletableFuture

在CompletableFuture出来之前,我们可以用Future接口进行异步编程,Future配合线程池一起工作,它把任务交给线程池,线程池中处理完毕后通过Future.get()方法来获取结果,Future.get()可以理解为一个回调操作,在回调之前我们还可以做其他事情。

下面一个例子用来模拟小明借图书场景:

  1. 小明去图书馆借书
  2. 图书管理员找书(异步操作)
  3. 小明边玩手机边等待
  4. 小明拿到书
public class FutureTest extends TestCase {
    // 申明一个线程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
            5,
            0,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());

    public void testBook() {
        String bookName = "《飘》";
        System.out.println("小明去图书馆借书");
        Future<String> future = threadPoolExecutor.submit(() -> {
            // 模拟图书管理员找书花费时间
            long minutes = (long) (Math.random() * 10) + 1;
            System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书" + bookName);
            Thread.sleep((long) (Math.random() * 2000));
            return bookName;
        });
        // 等待过程中做其他事情
        this.playPhone();
        try {
            String book = future.get();
            System.out.println("小明拿到了图书" + book);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    private void playPhone() {
        System.out.println("小明在玩手机等待图书");
    }

}

这是一个典型的Future使用方式,其中future.get()方法是阻塞的,程序运行会停留在这一行,我们的程序不可能一直等待下去,这个时候可以用future.get(long timeout, TimeUnit unit)方法,给定一个等待时间,如果超过等待时间还是没有拿到数据则抛出一个TimeoutException异常,我们可以catch这个异常,然后对异常情况做出处理。

现在假设小明最多等待2分钟,那么代码可以这么写:

String book = future.get(2, TimeUnit.MINUTES);

现在有这么一种情况,假设图书管理员找到书本之后,还需要交给助理,让助理录入图书信息,录完信息才把书交给小明。助理录入的过程也是异步的,也就是说,我们要实现多个异步进行流水线这样的功能。

可以发现Future用来处理多异步流水线非常困难,这个时候CompletableFuture就派上用场了,CompletableFuture自带流水线特性,就好比Collection对应的Stream。

下面来看下CompletableFuture的基本用法,我们将上面的例子使用CompletableFuture实现:

public class CompletableFutureTest extends TestCase {

    public void testBook() {
        String bookName = "《飘》";
        System.out.println("小明去图书馆借书");
        CompletableFuture<String> future = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟图书管理员找书花费时间
            long minutes = (long) (Math.random() * 10) + 1;
            System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书" + bookName);
            try {
                Thread.sleep((long) (Math.random() * 2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            future.complete(bookName);
        }).start();
        // 等待过程中做其他事情
        this.playPhone();
        try {
            String book = future.get();
            System.out.println("小明拿到了图书" + book);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    private void playPhone() {
        System.out.println("小明在玩手机等待图书");
    }

}

其中future.complete(bookName);的意思是将结果返回,然后调用future.get()的地方就能获取到数据。

CompletableFuture还提供了一个静态方法CompletableFuture.supplyAsync(Supplier)用来快速创建任务,参数Supplier用来返回任务结果。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟图书管理员找书花费时间
    long minutes = (long) (Math.random() * 10) + 1;
    System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书《飘》");
    try {
        Thread.sleep((long) (Math.random() * 2000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "《飘》";
});

如果不需要返回结果可以使用CompletableFuture.runAsync(Runnable)方法:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("running"))

接下来,我们使用CompletableFuture完成上面说到的需求:图书管理员找到书本之后,还需要交给助理,让助理录入图书信息

我们需要用到CompletableFuture.thenCompose(Function)方法,用法如下

public void testBook3() {
    String bookName = "《飘》";
    System.out.println("小明去图书馆借书");
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        // 模拟图书管理员找书花费时间
        long minutes = (long) (Math.random() * 10) + 1;
        System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书"+bookName);
        try {
            Thread.sleep((long) (Math.random() * 2000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return bookName;
    })
            // thenCompose,加入第二个异步任务
            .thenCompose((book/*这里的参数是第一个异步返回结果*/) -> CompletableFuture.supplyAsync(()-> {
        System.out.println("助理录入图书信息");
        return book;
    }));
    // 等待过程中做其他事情
    this.playPhone();
    try {
        String book = future.get();
        System.out.println("小明拿到了图书" + book);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

thenApply(),thenAccept(),thenCompose(),thenCombine()

thenApply(Function)的意思是对CompletableFuture返回结果做进一步处理,然后返回一个新的结果,它的参数使用的是Function,意味着可以使用lambda表达式,表达式会提供一个参数,然后需要一个返回结果。

public void testThenApply() throws ExecutionException, InterruptedException {
    int i = 0;
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> i + 1)
    // 将相加后的结果转成字符串
    // v就是上面i+1后的结果
    // 等同于:.thenApply((v) -> String.valueOf(v))
            .thenApply(String::valueOf);
    String str = future.get();
    System.out.println("String value: " + str);
}

如果不需要返回结果,可以用thenAccept(Consumer<? super T> action)

thenApply(),thenAccept()的区别是,thenApply提供参数,需要返回值,thenAccept只提供参数不需要返回值。

thenCompose()的用法如下:

completableFuture1.thenCompose((completableFuture1_result) -> completableFuture2)

这段代码的意思是将completableFuture1中返回的结果带入到completableFuture2中去执行,然后返回completableFuture2中的结果,下面是一个简单的实例:

public void testThenCompose() throws ExecutionException, InterruptedException {
        int i=0;
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> i + 1)
                .thenCompose((j) -> CompletableFuture.supplyAsync(() -> j + 2));
        Integer result = future.get();
    }

打印:

result:3

thenCombine()方法是将两个CompletableFuture任务结果组合起来

public void testThenCombine() throws ExecutionException, InterruptedException {
    int i=0;
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> i + 1)
            .thenCombine(CompletableFuture.supplyAsync(() -> i + 2), (result1, result2) -> {
                System.out.println("第一个CompletableFuture结果:" + result1);
                System.out.println("第二个CompletableFuture结果:" + result2);
                return result1 + result2;
            });
    Integer total = future.get();
    System.out.println("总和:" + total);
}

打印:

第一个CompletableFuture结果:1
第二个CompletableFuture结果:2
总和:3

CompletableFuture.join()

假设有一组CompletableFuture对象,现在需要这些CompletableFuture任务全部执行完毕,然后再接着做某些事情。针对这个需求,我们可以使用CompletableFuture.join()方法。

public void testJoin() {
    List<CompletableFuture> futures = new ArrayList<>();
    System.out.println("100米跑步比赛开始");
    for (int i = 0; i < 10; i++) {
        final int num = i + 1;
        futures.add(CompletableFuture.runAsync(() -> {
            int v =  (int)(Math.random() * 10);
            try {
                Thread.sleep(v);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(num + "号选手到达终点,用时:" + (10 + v) + "秒");
        }));
    }
    CompletableFuture<Double>[] futureArr = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture.allOf(futureArr).join();
    System.out.printf("所有选手到达终点");
}

打印:

100米跑步比赛开始
3号选手到达终点,用时:16秒
1号选手到达终点,用时:15秒
2号选手到达终点,用时:11秒
5号选手到达终点,用时:13秒
4号选手到达终点,用时:15秒
8号选手到达终点,用时:10秒
6号选手到达终点,用时:18秒
10号选手到达终点,用时:12秒
7号选手到达终点,用时:19秒
9号选手到达终点,用时:18秒
所有选手到达终点

whenComplete()

如果需要在任务处理完毕后做一些处理,可以使用whenComplete(BiConsumer)whenCompleteAsync(BiConsumer)

String bookName = "《飘》";
CompletableFuture<String> future = CompletableFuture.
        supplyAsync(() -> {System.out.println("图书管理员开始找书");return bookName;})
        .thenApply((book) -> {System.out.println("找到书本,助理开始录入信息"); return book;})
        .whenCompleteAsync(((book, throwable) -> {
            System.out.println("助理录入信息完毕,通知小明来拿书");
}));
String book = future.get();
System.out.println("小明拿到书" + book);

打印:

图书管理员开始找书
找到书本,助理开始录入信息
助理录入信息完毕,通知小明来拿书
小明拿到书《飘》

异常处理

对异常的处理通常分为两步,第一步抛出异常,第二步捕获异常,首先我们来看下CompletableFuture如何抛出异常。

CompletableFuture抛出异常有两种方式,第一种方式,如果CompletableFuture是直接new出来的对象,必须使用future.completeExceptionally(e)抛出异常,如果采用 throw new RuntimeException(e);方式抛出异常,调用者是捕获不到的。

CompletableFuture<String> future = new CompletableFuture<>();
new Thread(()->{
    String value = "http://";
    try {
        // 模拟出错,给一个不存在的ENCODE
        value = URLEncoder.encode(value, "UTF-888");
        future.complete(value);
    } catch (UnsupportedEncodingException e) {
        // future处理异常
        future.completeExceptionally(e);
        // !!此方式调用者无法捕获异常
        // throw new RuntimeException(e);
    }
}).start();

第二中方式,如果CompletableFuture对象是由工厂方法(如CompletableFuture.supplyAsync())创建的,可以直接throw new RuntimeException(e),因为supplyAsync()封装的方法内部做了try...catch处理

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    String value = "http://";
    try {
        // 模拟出错,给一个不存在的ENCODE
        value = URLEncoder.encode(value, "UTF-88");
        return value;
    } catch (UnsupportedEncodingException e) {
        // 这样不行,可以throw
        //future.completeExceptionally(e);
        throw new RuntimeException(e);
    }
});

接下来我们来看下如何捕获异常,捕获异常也分为两种。

第一种,在catch中捕获:

try {
    future.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    // 捕获异常1,首先会到这里来
    System.out.println("捕获异常1,msg:" + e.getMessage());
}

第二种,在future.whenCompleteAsync()或future.whenComplete()方法中捕获:

future.whenCompleteAsync((value, e)->{
    if (e != null) {
        // 捕获异常2,这里也会打印
        System.out.println("捕获异常2, msg:" + e.getMessage());
    } else {
        System.out.println("返回结果:" + value);
    }
});

小结

CompletableFuture的出现弥补了Future接口在某些地方的不足,比如事件监听,多任务合并,流水线操作等。同时CompletableFuture配合lambda表达式让开发者使用起来更加方面,使得开发者在异步编程上多了一种选择。

05-29 11:59