我想在Flux上执行移动窗口计算,并生成包含计算值的Flux,但是我无法理解如何完成此操作。

举一个简化的例子,假设我有一个整数通量,并且我想产生一个新的通量,该通量中每三个连续的整数之和。为了显示:

第一通量包含1到8的整数:{1、2、3、4、5、6、7、8}

结果通量应包含和:{1 + 2 + 3、2 + 3 + 4、3 + 4 + 5、4 + 5 + 6、5 + 6 + 7、6 + 7 + 8}

我可以轻松产生第一个通量,并得出包含连续3个值的通量通量,如下所示:

Flux<Integer> f1 = Flux.range(1,8);
Flux<Flux<Integer>> f2 = f1.window(3,1);


我还可以对f2进行subscription()并计算总和,但是我不知道如何同时将这些总和作为新的Flux发布。

我是否错过了一些简单的事情,还是实际上很难做到?

最佳答案

您可以在内部通量上使用.reduce(Integer::sum)来执行窗口中元素的总和,在外部通量上使用.flatMap可以将这些总和合并回到单个流中。

请注意,由于.window是用maxSize < skip调用的,因此尾随窗口中的项目少于最大大小。

Flux<Integer> sums = Flux.range(1, 8)                    // Flux<Integer>
        .window(3, 1)                                    // Flux<Flux<Integer>>
        .flatMap(window -> window.reduce(Integer::sum)); // Flux<Integer>

StepVerifier.create(sums)
        .expectNext(6)  // 1+2+3
        .expectNext(9)  // 2+3+4
        .expectNext(12) // 3+4+5
        .expectNext(15) // 4+5+6
        .expectNext(18) // 5+6+7
        .expectNext(21) // 6+7+8
        .expectNext(15) // 7+8
        .expectNext(8)  // 8
        .verifyComplete();

09-28 01:43