一、基础概念
- 行为参数化:提高代码可重用性。
- 方法引用
- lamda
- 函数式接口
- 前向兼容性支持
接口可添加默认方法
二、Stream流
定义及概述
- 数据源:执行一个查询
- 中间操作:形成一条流的流水线。构建器模式
- 筛选/filter
- 切片/limit:短路技巧
- skip
- 查找/find、findFirst、findAny
- 匹配/match、allMatch、anyMatch、noneMatch
- 映射/map、mapToDouble、mapToInt、mapToLong
- flatMap:将多个流扁平化为一个流
- 归约/reduce
- distinct
- 终端操作:执行流水线,并能生成结果
- collect(Collectors)
- count、max、min
- forEach
- allMatch、anyMatch、noneMatch
- findFirst、findAny
- ifPresent
- reduce
注意:何时使用findFirst和findAny?findFirst在并行上限制较多。如果不关心返回的元素是哪个,尽量使用findAny。
关键词:延迟算法、流只能使用一次、无限流、循环合并、有状态、有界、外部迭代、内部迭代、更方便的利用多核、并行处理能力、流优化、流特化、延迟和短路(按需计算)
构建流
获取流的四种方式: Collection Arrays Stream.of Stream.iterable
- 由值构建流
Stream.of(e1,e2,e3...)
Stream.empty() - 由数组构建流
Arrays.stream(numbers) - 由文件生成流
??? 由函数生成流:创建无限流
Stream.iterate和Stream.generate。中间操作
收集器
summarizingInt、summarizingDouble、summarizingLong、joining
- Collectors.reducing工厂方法是所有这些特殊情况的一般化。
- 初始值
- 转换函数
- 累积函数
- 分组
普通的单参数groupingBy(f)(其中f是分类函数)实际上是groupingBy(f, toList())的简便写法。
Collectors.collectingAndThen工厂方法返回的收集器。
Map<Dish.Type, Dish> mostCaloricByType =
menu.stream()
.collect(groupingBy(Dish::getType,
collectingAndThen(
maxBy(comparingInt(Dish::getCalories)),
Optional::get)));
- 分区:分组一种特殊情况 partitioningBy(Boolean)
流特化——数值流
- 映射到数值流
mapToInt
mapToDouble
mapToLong - 转换回对象流
boxed 默认值OptionalInt
OptionalInt
OptionalDouble
OptionalLong数值范围
IntStream、LongStream
range(右开)、rangeClosed(右闭)
三、并行数据处理与性能
并行流
- 使用建议:
- 避免共享可变状态.
- 保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。考虑流的操作流水线的总计算成本。单个元素通过流水线的处理成本较高意味着使用并行流时性能好的可能性比较大。
- 选择适当的数据结构往往比并行化算法更重要,使用正确的数据结构然后使其并行工作能够保证最佳的性能。考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。考虑终端操作中合并步骤的代价是大是小。
- 善用unordered方法来把有序流变成无序流。例如:你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。
- 并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
- 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream、DoubleStream)来避免这种操作,但凡有可能都应该用这些流。
- 用适当的基准来检查其性能。
- 流自身的特点。有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。
一组计算操作只有最后一次的parallel或sequential生效。
配置并行流使用的线程池,默认为内核数。修改方法:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
分支/合并框架
并行流实现的基础架构正是分支/合并框架。
工作窃取算法:双向链式队列数据结构,实现线程负载均衡。
Spliterator接口
- Java 8已经为集合框架中包含的所有数据结构提供了一个默认的Spliterator实现。
- 定制自己的实现Spliterator。
四、组合式异步编程:CompletableFuture
ppt思路
一、为什么要有CompletableFuture
1. java8之前是怎么做的
1. Future接口(java7)
2. CompletableFuture接口(java8)
2. 有什么弊端
3. CompletableFuture能给我们带来什么
![](https://img2018.cnblogs.com/blog/1378450/201912/1378450-20191227171451645-1807237620.png)
1. 为用户提供异步API(server)
2. 异步调用一个同步缓慢的API(client)
3. 对多个异步任务进行流水线操作
4. ![](https://img2018.cnblogs.com/blog/1378450/201912/1378450-20191227171459625-1743411508.png)
二、怎么去使用CompletableFuture
1. 概述
2. API
1. supplyAsync
1. 异步
2. 错误处理
2. thenApply 与 thenCompose关系?
3. thenCompose:对两个异步操作进行流水线。
1. thenComposeAsync 通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行。而名称以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。
4. thenCombine:将两个完全不相干的CompletableFuture对象的结果整合起来,不需要等到第一个任务完全结束才开始第二项任务。
1. thenCombineAsync
2. 有一家商店提供的价格是以欧元(EUR)计价的,但是你希望以美元的方式提供给你的客户。你可以用异步的方式向商店查询指定商品的价格,同时从远程的汇率服务那里查到欧元和美元间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。
3. ![](https://img2018.cnblogs.com/blog/1378450/201912/1378450-20191227171516161-455644415.png)
4. java7的做法
5. ![](https://img2018.cnblogs.com/blog/1378450/201912/1378450-20191227171525430-553311131.png)
6. completion 事件
你会希望尽快将不同商店中的商品价格呈现给你的用户(这是车辆保险或者机票比价网站的典型需求),而不是像你之前那样,等所有的数据都完备之后再呈现。接下来的一节,你会了解如何通过响应CompletableFuture的completion事件实现这一功能(与此相反,调用get或者join方法只会造成阻塞,直到CompletableFuture完成才能继续往下运行)。
5. thenAccept
1. thenAcceptAsync
2. ![](https://img2018.cnblogs.com/blog/1378450/201912/1378450-20191227171534172-151231680.png)
6. 利用join取得它们的返回值
3. Hello World 以及高级使用(使用定制的执行器)
4. 注意事项:使用场景
示例需求
Future中进行的都是单次的操作。
对多个异步任务进行流水线操作。
- 创建一个名为“最佳价格查询器”(best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。
- 在线商店返回了你想要购买商品的原始价格,并附带着一个折扣代码——最终,要计算出该商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。
- 实现的findPrices方法只有在取得所有商店的返回值时才显示商品的价格。
- 只要有商店返回商品价格就在第一时间显示返回值,不再等待那些还未返回的商店(有些甚至会发生超时)。如何以响应式的方式处理异步操作的完成事件!
三、 总结
并行——使用流还是CompletableFutures?
目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map
这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助
你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
我们对使用这些API的建议如下。
❑如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实
现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要
创建比处理器核数更多的线程)。
❑反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用
CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者
W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的
流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。把流转换为并行流的方式,非常容易提升该程序的性能。但是在商店的数目增加时,扩展性不好。因为并行流Stream底层依赖的是线程数量固定的通用线程池。相反,你也知道,如果自
定义CompletableFutures调度任务执行的执行器能够更充分地利用CPU资源。
“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。
- 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。thenCompose
- 等待Future集合中的所有任务都完成。allof
- 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。anyof
- 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。supplyAsyn
- 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。complete
小结
You got it !
- 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可
以改善程序的性能,加快程序的响应速度。runAsync - 你应该尽可能地为客户提供异步API。使用CompletableFuture类提供的特性,你能够
轻松地实现这一目标。supplyAsync - CompletableFuture类还提供了异常管理的机制,让你有机会抛出/管理异步任务执行
中发生的异常。exceptionally - 将同步API的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果。
- 如果异步任务之间相互独立,或者它们之间某一些的结果是另一些的输入,你可以将这
些异步任务构造或者合并成一个。thenCompose/thenCombine - 你可以为CompletableFuture注册一个回调函数,在Future执行完毕或者它们计算的
结果可用时,针对性地执行一些程序。thenApply/thenAccept - 你可以决定在什么时候结束程序的运行,是等待由CompletableFuture对象构成的列表
中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。anyOf/allOf
五、LocalDate LocalTime & Optional
培训分享技巧总结
- 准备工作
- 关注受众了解程度
- 建议汇报方式——ppt
- 分享过程
- 多交互,调动受众兴趣
- 提问
- Lambda表达式的参数中需要声明类型吗?
- forEach是并行的吗?list.forEach 与 stream.forEach区别何在 ?
- 四大函数式接口之外,扩展有哪些,为什么要有这些扩展? 原始类型特质化
- 分享后
- 总结