• 这些弊端使得效率底下,根本无法接受。如果不使用Stream API我们都知道上述代码该如何在一次迭代中完成,大致是如下形式:

    int longest = 0;
    for(String str : strings){
        if(str.startsWith("A")){// 1. filter(), 保留以A开头的字符串
            int len = str.length();// 2. mapToInt(), 转换成长度
            longest = Math.max(len, longest);// 3. max(), 保留最长的长度
        }
    }

    采用这种方式我们不但减少了迭代次数,也避免了存储中间结果,显然这就是流水线,因为我们把三个操作放在了一次迭代当中。只要我们事先知道用户意图,总是能够采用上述方式实现跟Stream API等价的功能,但问题是Stream类库的设计者并不知道用户的意图是什么。

    如何在无法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题。

    Stream流水线解决方案

    我们大致能够想到,应该采用某种方式记录用户每一步的操作,当用户调用结束操作时将之前记录的操作叠加到一起在一次迭代中全部执行掉。沿着这个思路,有几个问题需要解决:

    >> 操作如何记录

    注意这里使用的是“操作(operation)”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是<数据来源,操作,回调函数>构成的三元组。

    Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。

    天天在用Stream,那你知道如此强大的Stream的实现原理吗?-LMLPHP

    还有IntPipeline, LongPipeline, DoublePipeline没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟ReferencePipeline是并列关系。

    图中Head用于表示第一个Stage,即调用调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作;StatelessOpStatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。

    Stream流水线组织结构示意图如下:

    天天在用Stream,那你知道如此强大的Stream的实现原理吗?-LMLPHP

  • >> 叠加之后的操作如何执行

    天天在用Stream,那你知道如此强大的Stream的实现原理吗?-LMLPHP

    Sink完美封装了Stream每一步操作,并给出了[处理->转发]的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。

    是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。

    结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。

    结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[处理->转发]模型,结束操作的Sink就是调用链的出口。

    我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。

    但Stream类库的设计者没有这么做,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法来得到Sink,该方法的作用是返回一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?

    这是因为使用opWrapSink()可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:

    // AbstractPipeline.wrapSink()
    // 从下游向上游不断包装Sink。如果最初传入的sink代表结束操作,
    // 函数返回时就可以得到一个代表了流水线上所有操作的Sink。
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        ...
        for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

    现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:

    // AbstractPipeline.copyInto(), 对spliterator代表的数据执行wrappedSink代表的操作。
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        ...
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知开始遍历
            spliterator.forEachRemaining(wrappedSink);// 迭代
            wrappedSink.end();// 通知遍历结束
        }
        ...
    }

    上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代,最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。

    >> 执行后的结果在哪里

    最后一个问题是流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?首先要说明的是不是所有的Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(Side-effects),比如使用Stream.forEach()方法将结果打印出来就是常见的使用副作用的场景(事实上,除了打印之外其他场景都应避免使用副作用),对于真正需要返回结果的结束操作结果存在哪里呢?

    // 错误的收集方式
    ArrayList<String> results = new ArrayList<>();
    stream.filter(s -> pattern.matcher(s).matches())
          .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
    // 正确的收集方式
    List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
                 .collect(Collectors.toList());  // No side-effects!

    回到流水线执行结果的问题上来,需要返回结果的流水线结果存在哪里呢?这要分不同的情况讨论,下表给出了各种有返回结果的Stream结束操作。

    返回类型 对应的结束操作
    boolean anyMatch() allMatch() noneMatch()
    Optional findFirst() findAny()
    归约结果 reduce() collect()
    数组 toArray()

    结语

    本文详细介绍了Stream流水线的组织方式和执行过程,学习本文将有助于理解原理并写出正确的Stream代码,同时打消你对Stream API效率方面的顾虑。如你所见,Stream API实现如此巧妙,即使我们使用外部迭代手动编写等价代码,也未必更加高效。

    注:留下本文所用的JDK版本,以便有考究癖的人考证:

    $ java -version
    java version "1.8.0_101"
    Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
    Java HotSpot(TM) Server VM (build 25.101-b13, mixed mode)
    
    
       

    本文分享自微信公众号 - 架构师修炼(jiagouxiulian)。
    如有侵权,请联系 [email protected] 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    08-30 20:20