我是反应式编程的新手。我看到可以压缩两个单声道以生成结果:

Mono<Info> info = Mono.just(id).map(this::getInfo).subscribeOn(Schedulers.parallel());

Mono<List<Detail>> detail= Mono.just(petitionRequest).map(this.service::getDetails)
                    .subscribeOn(Schedulers.parallel());

Flux<Generated> flux = Flux.zip(detail, info, (p, c) -> {
            Generated o = Generated.builder().info(c).detail(p).build();
            return o;
        });


据我了解,这将两个调用并置,并生成当我调用flux.blockFirst()时生成的对象

如何将另一个单声道合并到现有的两个单声道以生成结果? Flux.zip仅接受两个单声道。

提前致谢。

最佳答案

首先,由于要压缩Monos,因此使用Mono中的zip运算符代替Flux是有意义的。

它具有多个重载版本,可以接受任意数量的Monos。

另外,如果this.service::getDetailsthis::getInfo阻止IO操作(HTTP请求,数据库调用等),则应使用弹性调度程序而不是并行调度程序,后者用于CPU密集型操作。

样例代码:

    Mono<Info> info = Mono.just(id)
                          .map(this::getInfo)
                          .subscribeOn(Schedulers.elastic());

    Mono<List<Detail>> detail= Mono.just(petitionRequest)
                                   .map(this.service::getDetails)
                                   .subscribeOn(Schedulers.elastic());

    Mono<Description> description = Mono.just(id)
                                        .map(this::callService)
                                        .subscribe(Schedulers.elastic());

    Mono.zip(info, detail, description)
        .map(this::map);

    private Generated map(Tuple3<Info, List<Detail>, Description> tuple3)
    {
        Info info = tuple3.getT1();
        List<Detail> details = tuple3.getT2();
        Description description = tuple3.getT3();

        // build output here
    }

10-08 17:28