关于How to skip even lines of a Stream obtained from the Files.lines问题,我遵循了公认的答案方法,基于filterEven()
接口(interface)实现了自己的Spliterator<T>
方法,例如:
public static <T> Stream<T> filterEven(Stream<T> src) {
Spliterator<T> iter = src.spliterator();
AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
{
@Override
public boolean tryAdvance(Consumer<? super T> action) {
iter.tryAdvance(item -> {}); // discard
return iter.tryAdvance(action); // use
}
};
return StreamSupport.stream(res, false);
}
我可以通过以下方式使用:
Stream<DomainObject> res = Files.lines(src)
filterEven(res)
.map(line -> toDomainObject(line))
但是,相对于使用带有副作用的
filter()
的下一种方法来衡量这种方法的性能,我注意到下一种方法的效果更好:final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
.filter(line -> isEvenLine ())
.map(line -> toDomainObject(line))
我使用JMH测试了性能,但没有在基准测试中包含文件负载。我以前将其加载到数组中。然后,每个基准测试都从先前的数组创建
Stream<String>
开始,然后过滤偶数行,然后应用mapToInt()
提取int
字段的值,最后是max()
操作。这是基准测试之一(您可以检查整个Program
here,在这里您有data file with about 186 lines):@Benchmark
public int maxTempFilterEven(DataSource src){
Stream<String> content = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1); // Skip line: Not available
return filterEven(content) // Filter daily info and skip hourly
.mapToInt(line -> parseInt(line.substring(14, 16)))
.max()
.getAsInt();
}
我不明白为什么
filter()
方法具有比filterEven()
(〜50ops/ms)更好的性能(〜80ops/ms)? 最佳答案
简介
我想我知道原因,但是不幸的是,我不知道如何提高基于Spliterator
的解决方案的性能(至少在不重写整个Streams API功能的情况下)。
旁注1 :设计Stream API时,性能并不是最重要的设计目标。如果性能至关重要,则最有可能在不使用Stream API的情况下重新编写代码将使代码更快。 (例如,Stream API不可避免地会增加内存分配,从而增加GC压力)。另一方面,在大多数情况下,Stream API以相对较小的性能下降为代价提供了更好的高级API。
部分 1 或理论上的简短回答Stream
旨在实现一种内部迭代作为消耗的主要方式,而外部迭代(即基于Spliterator
的方式)是一种“模拟”的附加手段。因此,外部迭代会涉及一些开销。懒惰对外部迭代的效率增加了一些限制,并且由于需要支持flatMap
,因此有必要在此过程中使用某种动态缓冲区。
旁注2 在某些情况下,基于Spliterator
的迭代可能与内部迭代一样快(在这种情况下为filter
)。在直接从包含数据的Spliterator
直接创建Stream
的情况下尤其如此。要查看它,可以修改测试以将第一个过滤器具体化为String
s数组:
String[] filteredData = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1)
.toArray(String[]::new);
然后比较
maxTempFilter
和maxTempFilterEven
的性能,以接受该预先过滤的String[] filteredData
。如果您想知道为什么会这样,您可能应该阅读本长答案的其余部分,或者至少阅读第2部分。部分 2 或更长的理论答案:
流的设计主要是通过某些终端操作将其整体消耗掉。尽管被支持,但一个接一个的迭代元素并不是设计使用流的主要方式。
请注意,使用“功能性” Stream API(例如
map
,flatMap
,filter
,reduce
和collect
),您不能在某个步骤中说“我已经有足够的数据,不再遍历源并推送值”。您可以丢弃某些传入数据(就像filter
一样),但不能停止迭代。 (take
和skip
转换实际上是在内部使用Spliterator
来实现的; anyMatch
,allMatch
,noneMatch
,findFirst
,findAny
等使用非公共(public)API j.u.s.Sink.cancellationRequested
,它们也更容易使用,因为不能进行多个终端操作。)如果管道中的所有转换都是同步的,则可以将它们组合为一个聚合函数(Consumer
),然后在一个简单的循环中调用它(可以选择将循环执行拆分为多个线程)。这就是我简化的基于状态的过滤器所代表的含义(请参见中的代码,向我显示一些代码部分)。如果管道中有一个flatMap
,它会变得更加复杂,但是思路仍然相同。基于
Spliterator
的转换从根本上有所不同,因为它向管道增加了异步的,由消费者驱动的步骤。现在,Spliterator
而不是源Stream
驱动了迭代过程。如果您直接在源Spliterator
上请求Stream
,它也许可以返回一些实现,该实现只需对其内部数据结构进行迭代,这就是为什么实现预先过滤的数据应消除性能差异的原因。但是,如果您为某些非空管道创建Spliterator
,则除了要求源将元素逐一推送通过管道,直到某个元素通过所有过滤器之外,没有其他(简单)的选择(另请参见示例2中的第二个示例)。 ,向我显示一些部分代码)。源元素被逐个而不是分批推送的事实是使Stream
变得懒惰的基本决定的结果。需要缓冲区而不是仅一个元素是对flatMap
的支持的结果:从源中推送一个元素可以为Spliterator
生成许多元素。部分 3 或向我显示一些代码
本部分试图为“理论”部分中所描述的代码提供支持(包括链接到实际代码和模拟代码)。
首先,您应该知道当前的Streams API实现将非终端(中间)操作累积到单个惰性管道中(请参阅j.u.s.AbstractPipeline及其子类(例如j.u.s.ReferencePipeline)。然后,当应用了终端操作时,原始操作中的所有元素
Stream
通过管道“推送”。您看到的是两件事的结果:
里面有一个基于
Spliterator
的步骤。 OddLines
不是具有状态过滤器的代码或多或少类似于以下简单代码:
static int similarToFilter(String[] data)
{
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
int skip = 1;
boolean reduceEmpty = true;
int reduceState = 0;
for (String outerEl : data)
{
if (outerEl.charAt(0) != '#')
{
if (skip > 0)
skip--;
else
{
if (isEvenLine.test(outerEl))
{
int intEl = parseInt(outerEl.substring(14, 16));
if (reduceEmpty)
{
reduceState = intEl;
reduceEmpty = false;
}
else
{
reduceState = Math.max(reduceState, intEl);
}
}
}
}
}
return reduceState;
}
请注意,这实际上是一个内部包含一些计算(过滤/转换)的循环。
另一方面,当您在管道中添加
Spliterator
时,情况会发生很大变化,即使使用与实际发生的情况基本相似的简化代码,它也会变得更大,例如:interface Sp<T>
{
public boolean tryAdvance(Consumer<? super T> action);
}
static class ArraySp<T> implements Sp<T>
{
private final T[] array;
private int pos;
public ArraySp(T[] array)
{
this.array = array;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (pos < array.length)
{
action.accept(array[pos]);
pos++;
return true;
}
else
{
return false;
}
}
}
static class WrappingSp<T> implements Sp<T>, Consumer<T>
{
private final Sp<T> sourceSp;
private final Predicate<T> filter;
private final ArrayList<T> buffer = new ArrayList<T>();
private int pos;
public WrappingSp(Sp<T> sourceSp, Predicate<T> filter)
{
this.sourceSp = sourceSp;
this.filter = filter;
}
@Override
public void accept(T t)
{
buffer.add(t);
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
while (true)
{
if (pos >= buffer.size())
{
pos = 0;
buffer.clear();
sourceSp.tryAdvance(this);
}
// failed to fill buffer
if (buffer.size() == 0)
return false;
T nextElem = buffer.get(pos);
pos++;
if (filter.test(nextElem))
{
action.accept(nextElem);
return true;
}
}
}
}
static class OddLineSp<T> implements Sp<T>, Consumer<T>
{
private Sp<T> sourceSp;
public OddLineSp(Sp<T> sourceSp)
{
this.sourceSp = sourceSp;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (sourceSp == null)
return false;
sourceSp.tryAdvance(this);
if (!sourceSp.tryAdvance(action))
{
sourceSp = null;
}
return true;
}
@Override
public void accept(T t)
{
}
}
static class ReduceIntMax
{
boolean reduceEmpty = true;
int reduceState = 0;
public int getReduceState()
{
return reduceState;
}
public void accept(int t)
{
if (reduceEmpty)
{
reduceEmpty = false;
reduceState = t;
}
else
{
reduceState = Math.max(reduceState, t);
}
}
}
static int similarToSpliterator(String[] data)
{
ArraySp<String> src = new ArraySp<>(data);
int[] skip = new int[1];
skip[0] = 1;
WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) ->
{
if (s.charAt(0) == '#')
return false;
if (skip[0] != 0)
{
skip[0]--;
return false;
}
return true;
});
OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
final ReduceIntMax reduceIntMax = new ReduceIntMax();
while (oddLines.tryAdvance(s ->
{
int intValue = parseInt(s.substring(14, 16));
reduceIntMax.accept(intValue);
})) ; // do nothing in the loop body
return reduceIntMax.getReduceState();
}
此代码较大,因为如果在循环内没有一些非平凡的有状态回调,则无法(或至少很难)表示逻辑。这里的
Sp
接口(interface)是j.u.s.Stream
和j.u.Spliterator
接口(interface)的混合体。ArraySp
表示Arrays.stream
的结果。 WrappingSp
与j.u.s.StreamSpliterators.WrappingSpliterator相似,后者在实际代码中表示针对任何非空管道的Spliterator
接口(interface)的实现,即,至少应用了一个中间操作的Stream
(请参阅j.u.s.AbstractPipeline.spliterator method)。在我的代码中,我将其与StatelessOp
子类合并,并放置负责filter
方法实现的逻辑。同样为了简单起见,我使用skip
实现了filter
。 OddLineSp
对应于您的OddLines
及其产生的Stream
ReduceIntMax
表示ReduceOps
的Math.max
终端操作int
那么在此示例中重要的是什么?这里重要的是,因为您首先过滤了原始流,所以
OddLineSp
是根据非空管道(即WrappingSp
)创建的。而且,如果您仔细观察WrappingSp
,您会注意到每次tryAdvance
都被调用时,它将调用委托(delegate)给sourceSp
并将结果累积到buffer
中。而且,由于管道中没有flatMap
,因此buffer
的元素将被一一复制。 IE。每次调用WrappingSp.tryAdvance
时,它将调用ArraySp.tryAdvance
,准确地返回一个元素(通过回调),并将其进一步传递给调用方提供的consumer
(除非该元素与过滤器不匹配,在这种情况下将再次调用ArraySp.tryAdvance
)再一次,但buffer
永远不会一次填充多个元素)。旁注3 :如果要查看实际代码,则最有趣的地方是j.u.s.StreamSpliterators.
WrappingSpliterator.tryAdvance
,它调用j.u.s.StreamSpliterators.
AbstractWrappingSpliterator.doAdvance
依次调用j.u.s.StreamSpliterators. AbstractWrappingSpliterator.fillBuffer
,j.u.s.StreamSpliterators. pusher
依次调用在ojit_a初始化的WrappingSpliterator.initPartialTraversalState
因此,影响性能的主要因素是复制到缓冲区中。
对于我们这些普通的Java开发人员来说,不幸的是,Stream API的当前实现几乎是封闭的,您不能仅使用继承或组合来修改内部行为的某些方面。
您可以使用基于反射的黑客手段,使针对特定情况的复制到缓冲区的效率更高,并获得一定的性能(但是牺牲了
Stream
的惰性),但是您不能完全避免这种复制,因此基于Spliterator
的代码将是反正比较慢回到第2条旁注的示例,基于
Spliterator
的包含具体化filteredData
的测试工作得更快,因为在WrappingSp
之前的管道中没有OddLineSp
,因此不会复制到中间缓冲区中。