我是反应式编程的新手。我看到可以压缩两个单声道以生成结果:
Mono<Info> info = Mono.just(id).map(this::getInfo).subscribeOn(Schedulers.parallel());
Mono<List<Detail>> detail= Mono.just(petitionRequest).map(this.service::getDetails)
.subscribeOn(Schedulers.parallel());
Flux<Generated> flux = Flux.zip(detail, info, (p, c) -> {
Generated o = Generated.builder().info(c).detail(p).build();
return o;
});
据我了解,这将两个调用并置,并生成当我调用
flux.blockFirst()
时生成的对象如何将另一个单声道合并到现有的两个单声道以生成结果? Flux.zip仅接受两个单声道。
提前致谢。
最佳答案
首先,由于要压缩Monos,因此使用Mono中的zip运算符代替Flux是有意义的。
它具有多个重载版本,可以接受任意数量的Monos。
另外,如果this.service::getDetails
和this::getInfo
阻止IO操作(HTTP请求,数据库调用等),则应使用弹性调度程序而不是并行调度程序,后者用于CPU密集型操作。
样例代码:
Mono<Info> info = Mono.just(id)
.map(this::getInfo)
.subscribeOn(Schedulers.elastic());
Mono<List<Detail>> detail= Mono.just(petitionRequest)
.map(this.service::getDetails)
.subscribeOn(Schedulers.elastic());
Mono<Description> description = Mono.just(id)
.map(this::callService)
.subscribe(Schedulers.elastic());
Mono.zip(info, detail, description)
.map(this::map);
private Generated map(Tuple3<Info, List<Detail>, Description> tuple3)
{
Info info = tuple3.getT1();
List<Detail> details = tuple3.getT2();
Description description = tuple3.getT3();
// build output here
}