关于How to skip even lines of a Stream obtained from the Files.lines问题,我遵循了公认的答案方法,基于filterEven()接口(interface)实现了自己的Spliterator<T>方法,例如:

public static <T> Stream<T> filterEven(Stream<T> src) {
    Spliterator<T> iter = src.spliterator();
    AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
    {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            iter.tryAdvance(item -> {});    // discard
            return iter.tryAdvance(action); // use
        }
    };
    return StreamSupport.stream(res, false);
}

我可以通过以下方式使用:
Stream<DomainObject> res = Files.lines(src)
filterEven(res)
     .map(line -> toDomainObject(line))

但是,相对于使用带有副作用的filter()的下一种方法来衡量这种方法的性能,我注意到下一种方法的效果更好:
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
     .filter(line -> isEvenLine ())
     .map(line -> toDomainObject(line))

我使用JMH测试了性能,但没有在基准测试中包含文件负载。我以前将其加载到数组中。然后,每个基准测试都从先前的数组创建Stream<String>开始,然后过滤偶数行,然后应用mapToInt()提取int字段的值,最后是max()操作。这是基准测试之一(您可以检查整个Program here,在这里您有data file with about 186 lines):
@Benchmark
public int maxTempFilterEven(DataSource src){
    Stream<String> content = Arrays.stream(src.data)
            .filter(s-> s.charAt(0) != '#') // Filter comments
            .skip(1);                       // Skip line: Not available
    return filterEven(content)              // Filter daily info and skip hourly
            .mapToInt(line -> parseInt(line.substring(14, 16)))
            .max()
            .getAsInt();
}

我不明白为什么filter()方法具有比filterEven()(〜50ops/ms)更好的性能(〜80ops/ms)?

最佳答案

简介

我想我知道原因,但是不幸的是,我不知道如何提高基于Spliterator的解决方案的性能(至少在不重写整个Streams API功能的情况下)。

旁注1 :设计Stream API时,性能并不是最重要的设计目标。如果性能至关重要,则最有可能在不使用Stream API的情况下重新编写代码将使代码更快。 (例如,Stream API不可避免地会增加内存分配,从而增加GC压力)。另一方面,在大多数情况下,Stream API以相对较小的性能下降为代价提供了更好的高级API。

部分 1 理论上的简短回答
Stream旨在实现一种内部迭代作为消耗的主要方式,而外部迭代(即基于Spliterator的方式)是一种“模拟”的附加手段。因此,外部迭代会涉及一些开销。懒惰对外部迭代的效率增加了一些限制,并且由于需要支持flatMap,因此有必要在此过程中使用某种动态缓冲区。

旁注2 在某些情况下,基于Spliterator的迭代可能与内部迭代一样快(在这种情况下为filter)。在直接从包含数据的Spliterator直接创建Stream的情况下尤其如此。要查看它,可以修改测试以将第一个过滤器具体化为String s数组:

String[] filteredData = Arrays.stream(src.data)
            .filter(s-> s.charAt(0) != '#') // Filter comments
            .skip(1)
            .toArray(String[]::new);

然后比较maxTempFiltermaxTempFilterEven的性能,以接受该预先过滤的String[] filteredData。如果您想知道为什么会这样,您可能应该阅读本长答案的其余部分,或者至少阅读第2部分。

部分 2 更长的理论答案:

流的设计主要是通过某些终端操作将其整体消耗掉。尽管被支持,但一个接一个的迭代元素并不是设计使用流的主要方式。

请注意,使用“功能性” Stream API(例如mapflatMapfilterreducecollect),您不能在某个步骤中说“我已经有足够的数据,不再遍历源并推送值”。您可以丢弃某些传入数据(就像filter一样),但不能停止迭代。 (takeskip转换实际上是在内部使用Spliterator来实现的; anyMatchallMatchnoneMatchfindFirstfindAny等使用非公共(public)API j.u.s.Sink.cancellationRequested,它们也更容易使用,因为不能进行多个终端操作。)如果管道中的所有转换都是同步的,则可以将它们组合为一个聚合函数(Consumer),然后在一个简单的循环中调用它(可以选择将循环执行拆分为多个线程)。这就是我简化的基于状态的过滤器所代表的含义(请参见中的代码,向我显示一些代码部分)。如果管道中有一个flatMap,它会变得更加复杂,但是思路仍然相同。

基于Spliterator的转换从根本上有所不同,因为它向管道增加了异步的,由消费者驱动的步骤。现在,Spliterator而不是源Stream驱动了迭代过程。如果您直接在源Spliterator上请求Stream,它也许可以返回一些实现,该实现只需对其内部数据结构进行迭代,这就是为什么实现预先过滤的数据应消除性能差异的原因。但是,如果您为某些非空管道创建Spliterator,则除了要求源将元素逐一推送通过管道,直到某个元素通过所有过滤器之外,没有其他(简单)的选择(另请参见示例2中的第二个示例)。 ,向我显示一些部分代码)。源元素被逐个而不是分批推送的事实是使Stream变得懒惰的基本决定的结果。需要缓冲区而不是仅一个元素是对flatMap的支持的结果:从源中推送一个元素可以为Spliterator生成许多元素。

部分 3 向我显示一些代码

本部分试图为“理论”部分中所描述的代码提供支持(包括链接到实际代码和模拟代码)。

首先,您应该知道当前的Streams API实现将非终端(中间)操作累积到单个惰性管道中(请参阅j.u.s.AbstractPipeline及其子类(例如j.u.s.ReferencePipeline)。然后,当应用了终端操作时,原始操作中的所有元素Stream通过管道“推送”。

您看到的是两件事的结果:
  • 在您遇到的情况下,流管道不同的事实
    里面有一个基于Spliterator的步骤。
  • 您的OddLines不是
  • 管道中的第一步

    具有状态过滤器的代码或多或少类似于以下简单代码:
    static int similarToFilter(String[] data)
    {
        final int[] counter = {0};
        final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
        int skip = 1;
    
        boolean reduceEmpty = true;
        int reduceState = 0;
        for (String outerEl : data)
        {
            if (outerEl.charAt(0) != '#')
            {
                if (skip > 0)
                    skip--;
                else
                {
                    if (isEvenLine.test(outerEl))
                    {
                        int intEl = parseInt(outerEl.substring(14, 16));
                        if (reduceEmpty)
                        {
                            reduceState = intEl;
                            reduceEmpty = false;
                        }
                        else
                        {
                            reduceState = Math.max(reduceState, intEl);
                        }
                    }
                }
            }
        }
        return reduceState;
    }
    

    请注意,这实际上是一个内部包含一些计算(过滤/转换)的循环。

    另一方面,当您在管道中添加Spliterator时,情况会发生很大变化,即使使用与实际发生的情况基本相似的简化代码,它也会变得更大,例如:
    interface Sp<T>
    {
        public boolean tryAdvance(Consumer<? super T> action);
    }
    
    static class ArraySp<T> implements Sp<T>
    {
        private final T[] array;
        private int pos;
    
        public ArraySp(T[] array)
        {
            this.array = array;
        }
    
        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            if (pos < array.length)
            {
                action.accept(array[pos]);
                pos++;
                return true;
            }
            else
            {
                return false;
            }
        }
    }
    
    static class WrappingSp<T> implements Sp<T>, Consumer<T>
    {
        private final Sp<T> sourceSp;
        private final Predicate<T> filter;
    
        private final ArrayList<T> buffer = new ArrayList<T>();
        private int pos;
    
    
        public WrappingSp(Sp<T> sourceSp, Predicate<T> filter)
        {
            this.sourceSp = sourceSp;
            this.filter = filter;
        }
    
        @Override
        public void accept(T t)
        {
            buffer.add(t);
        }
    
        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            while (true)
            {
                if (pos >= buffer.size())
                {
                    pos = 0;
                    buffer.clear();
                    sourceSp.tryAdvance(this);
                }
                // failed to fill buffer
                if (buffer.size() == 0)
                    return false;
    
                T nextElem = buffer.get(pos);
                pos++;
                if (filter.test(nextElem))
                {
                    action.accept(nextElem);
                    return true;
                }
            }
        }
    }
    
    static class OddLineSp<T> implements Sp<T>, Consumer<T>
    {
        private Sp<T> sourceSp;
    
        public OddLineSp(Sp<T> sourceSp)
        {
            this.sourceSp = sourceSp;
        }
    
        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            if (sourceSp == null)
                return false;
    
            sourceSp.tryAdvance(this);
            if (!sourceSp.tryAdvance(action))
            {
                sourceSp = null;
            }
            return true;
    
        }
    
        @Override
        public void accept(T t)
        {
    
        }
    }
    
    static class ReduceIntMax
    {
        boolean reduceEmpty = true;
        int reduceState = 0;
    
        public int getReduceState()
        {
            return reduceState;
        }
    
        public void accept(int t)
        {
            if (reduceEmpty)
            {
                reduceEmpty = false;
                reduceState = t;
            }
            else
            {
                reduceState = Math.max(reduceState, t);
            }
        }
    }
    
    
    static int similarToSpliterator(String[] data)
    {
        ArraySp<String> src = new ArraySp<>(data);
    
        int[] skip = new int[1];
        skip[0] = 1;
        WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) ->
        {
            if (s.charAt(0) == '#')
                return false;
            if (skip[0] != 0)
            {
                skip[0]--;
                return false;
            }
            return true;
        });
        OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
        final ReduceIntMax reduceIntMax = new ReduceIntMax();
        while (oddLines.tryAdvance(s ->
                                   {
                                       int intValue = parseInt(s.substring(14, 16));
                                       reduceIntMax.accept(intValue);
                                   })) ; // do nothing in the loop body
        return reduceIntMax.getReduceState();
    }
    

    此代码较大,因为如果在循环内没有一些非平凡的有状态回调,则无法(或至少很难)表示逻辑。这里的Sp接口(interface)是j.u.s.Streamj.u.Spliterator接口(interface)的混合体。
  • ArraySp表示Arrays.stream的结果。
  • WrappingSpj.u.s.StreamSpliterators.WrappingSpliterator相似,后者在实际代码中表示针对任何非空管道的Spliterator接口(interface)的实现,即,至少应用了一个中间操作的Stream(请参阅j.u.s.AbstractPipeline.spliterator method)。在我的代码中,我将其与StatelessOp子类合并,并放置负责filter方法实现的逻辑。同样为了简单起见,我使用skip实现了filter
  • OddLineSp对应于您的OddLines及其产生的Stream
  • ReduceIntMax表示ReduceOpsMath.max终端操作int

  • 那么在此示例中重要的是什么?这里重要的是,因为您首先过滤了原始流,所以OddLineSp是根据非空管道(即WrappingSp)创建的。而且,如果您仔细观察WrappingSp,您会注意到每次tryAdvance都被调用时,它将调用委托(delegate)给sourceSp并将结果累积到buffer中。而且,由于管道中没有flatMap,因此buffer的元素将被一一复制。 IE。每次调用WrappingSp.tryAdvance时,它将调用ArraySp.tryAdvance,准确地返回一个元素(通过回调),并将其进一步传递给调用方提供的consumer(除非该元素与过滤器不匹配,在这种情况下将再次调用ArraySp.tryAdvance)再一次,但buffer永远不会一次填充多个元素)。

    旁注3 :如果要查看实际代码,则最有趣的地方是j.u.s.StreamSpliterators. WrappingSpliterator.tryAdvance ,它调用
    j.u.s.StreamSpliterators. AbstractWrappingSpliterator.doAdvance 依次调用j.u.s.StreamSpliterators. AbstractWrappingSpliterator.fillBuffer j.u.s.StreamSpliterators. pusher 依次调用在ojit_a初始化的WrappingSpliterator.initPartialTraversalState
    因此,影响性能的主要因素是复制到缓冲区中。
    对于我们这些普通的Java开发人员来说,不幸的是,Stream API的当前实现几乎是封闭的,您不能仅使用继承或组合来修改内部行为的某些方面。
    您可以使用基于反射的黑客手段,使针对特定情况的复制到缓冲区的效率更高,并获得一定的性能(但是牺牲了Stream的惰性),但是您不能完全避免这种复制,因此基于Spliterator的代码将是反正比较慢

    回到第2条旁注的示例,基于Spliterator的包含具体化filteredData的测试工作得更快,因为在WrappingSp之前的管道中没有OddLineSp,因此不会复制到中间缓冲区中。

    09-12 10:51