我想在Flux上执行移动窗口计算,并生成包含计算值的Flux,但是我无法理解如何完成此操作。
举一个简化的例子,假设我有一个整数通量,并且我想产生一个新的通量,该通量中每三个连续的整数之和。为了显示:
第一通量包含1到8的整数:{1、2、3、4、5、6、7、8}
结果通量应包含和:{1 + 2 + 3、2 + 3 + 4、3 + 4 + 5、4 + 5 + 6、5 + 6 + 7、6 + 7 + 8}
我可以轻松产生第一个通量,并得出包含连续3个值的通量通量,如下所示:
Flux<Integer> f1 = Flux.range(1,8);
Flux<Flux<Integer>> f2 = f1.window(3,1);
我还可以对f2进行subscription()并计算总和,但是我不知道如何同时将这些总和作为新的Flux发布。
我是否错过了一些简单的事情,还是实际上很难做到?
最佳答案
您可以在内部通量上使用.reduce(Integer::sum)
来执行窗口中元素的总和,在外部通量上使用.flatMap
可以将这些总和合并回到单个流中。
请注意,由于.window
是用maxSize < skip
调用的,因此尾随窗口中的项目少于最大大小。
Flux<Integer> sums = Flux.range(1, 8) // Flux<Integer>
.window(3, 1) // Flux<Flux<Integer>>
.flatMap(window -> window.reduce(Integer::sum)); // Flux<Integer>
StepVerifier.create(sums)
.expectNext(6) // 1+2+3
.expectNext(9) // 2+3+4
.expectNext(12) // 3+4+5
.expectNext(15) // 4+5+6
.expectNext(18) // 5+6+7
.expectNext(21) // 6+7+8
.expectNext(15) // 7+8
.expectNext(8) // 8
.verifyComplete();