本文介绍了对 Java 8 流进行分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在 Java 8 Stream 上实现分区"操作?分区我的意思是,将流划分为给定大小的子流.不知何故,它与 Guava Iterators.partition() 方法,只是希望分区是惰性求值的 Streams 而不是 List 的.

How to implement "partition" operation on Java 8 Stream? By partition I mean, divide a stream into sub-streams of a given size. Somehow it will be identical to Guava Iterators.partition() method, just it's desirable that the partitions are lazily-evaluated Streams rather than List's.

推荐答案

将任意源流划分为固定大小的批次是不可能的,因为这会破坏并行处理.并行处理时,您可能不知道拆分后第一个子任务中有多少元素,因此在第一个子任务完全处理之前,您无法为下一个子任务创建分区.

It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.

然而,可以从随机访问List创建分区流.例如,在我的 StreamEx 库中可以使用此类功能:

However it is possible to create the stream of partitions from the random access List. Such feature is available, for example, in my StreamEx library:

List<Type> input = Arrays.asList(...);

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);

或者如果你真的想要流的流:

Or if you really want the stream of streams:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);

如果不想依赖第三方库,可以手动实现这样的ofSubLists方法:

If you don't want to depend on third-party libraries, you can implement such ofSubLists method manually:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
    if (length <= 0)
        throw new IllegalArgumentException("length = " + length);
    int size = source.size();
    if (size <= 0)
        return Stream.empty();
    int fullChunks = (size - 1) / length;
    return IntStream.range(0, fullChunks + 1).mapToObj(
        n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}

这个实现看起来有点长,但它考虑了一些极端情况,比如接近 MAX_VALUE 的列表大小.

This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.

如果您想要无序流的并行友好解决方案(因此您不关心哪些流元素将在单个批次中组合),您可以像这样使用收集器(感谢@sibnick 的启发):

If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize,
                   Collector<List<T>, A, R> downstream) {
    class Acc {
        List<T> cur = new ArrayList<>();
        A acc = downstream.supplier().get();
    }
    BiConsumer<Acc, T> accumulator = (acc, t) -> {
        acc.cur.add(t);
        if(acc.cur.size() == batchSize) {
            downstream.accumulator().accept(acc.acc, acc.cur);
            acc.cur = new ArrayList<>();
        }
    };
    return Collector.of(Acc::new, accumulator,
            (acc1, acc2) -> {
                acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
                for(T t : acc2.cur) accumulator.accept(acc1, t);
                return acc1;
            }, acc -> {
                if(!acc.cur.isEmpty())
                    downstream.accumulator().accept(acc.acc, acc.cur);
                return downstream.finisher().apply(acc.acc);
            }, Collector.Characteristics.UNORDERED);
}

使用示例:

List<List<Integer>> list = IntStream.range(0,20)
                                    .boxed().parallel()
                                    .collect(unorderedBatches(3, Collectors.toList()));

结果:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]

这种收集器是完全线程安全的,可以为顺序流生成有序的批次.

Such collector is perfectly thread-safe and produces ordered batches for sequential stream.

如果你想为每个批次应用一个中间转换,你可以使用以下版本:

If you want to apply an intermediate transformation for every batch, you may use the following version:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
        Collector<T, AA, B> batchCollector,
        Collector<B, A, R> downstream) {
    return unorderedBatches(batchSize,
            Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}

例如,通过这种方式,您可以实时对每批中的数字求和:

For example, this way you can sum the numbers in every batch on the fly:

List<Integer> list = IntStream.range(0,20)
        .boxed().parallel()
        .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),
            Collectors.toList()));

这篇关于对 Java 8 流进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 15:44