我想将一个通量分成两个通量,其中第一个通量具有原始通量的第一项,第二个通量将保留其余的通量。
在每个通量上应用自定义转换myLogic
后,我想将它们合并为一个通量,以保留原始通量的顺序。
例子:
S:学生
S':申请myLogic
后的学生
发射通量:s1 -> s2 -> s3 -> s4
第一个拆分的流量:s1' => myLogic
第二个拆分的通量:s2' -> s3' -> s4' => myLogic
组合通量:s1' -> s2' -> s3' -> s4'
最佳答案
使用标准的Flux
方法take
和skip
分隔头和尾元素就足够了。在此之前调用cache
对避免订阅重复也很有用。
class Util {
static <T, V> Flux<V> dualTransform(
Flux<T> originalFlux,
int cutpointIndex,
Function<T, V> transformHead,
Function<T, V> transformTail
) {
var cached = originalFlux.cache();
var head = cached.take(cutpointIndex).map(transformHead);
var tail = cached.skip(cutpointIndex).map(transformTail);
return Flux.concat(head, tail);
}
static void test() {
var sample = Flux.just("a", "b", "c", "d");
var result = dualTransform(
sample,
1,
x -> "{" + x.toUpperCase() + "}",
x -> "(" + x + ")"
);
result.doOnNext(System.out::print).subscribe();
// prints: {A}(b)(c)(d)
}
}