顺便说一句,我还在学习weblux。
我不知道这是否可行,或者我使用了错误的方法,但考虑到这种并行变化。

Flux<String> enablers = Flux.fromIterable(enablersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.getAMono(string, entity, element))
                .sequential();


谁调用具有Webclient请求(service.getAMono)的方法

webClient.post()
              .uri(url)
              .headers(headers -> headers.addAll(httpHeaders))
              .body(BodyInserters.fromObject(request))
              .retrieve()
              .bodyToMono(entity2.class);


我需要等待启动器通量的流结束并处理其中的所有响应,原因是如果其中一个给我错误或否定响应,我将不会为阻滞剂运行其他并行磁通

Flux<String> blockers = Flux.fromIterable(blockersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.callAMono(string, entity, element))
                .sequential();


我虽然是关于“ zip”方法的,但是这个合并了两个响应,并不是我想要的
如果有人可以帮助我。

更新

enablers. //handle enablers response and if error return a custom Mono<response> with .reduce


如果enablers的句柄中没有错误,请与其他.thenMany一起进入Flux

最佳答案

我在第一个any中找到了有条件的flux方法,就像这样

Flux.fromIterable(enablersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.getAMono(string, entity, element))
                .sequential()
                .any(element -> *stuff here)//condition
                .flatMap(condition->{
                        if(condition.equals(Boolean.FALSE)){
                           return Flux.fromIterable(blockersList)
                                                   .parallel()
                                                   .runOn(Schedulers.elastic())
                                                   .flatMap(element -> service.callAMono(string, entity, element))
                                                   .sequential()
                                                   .reduce(**stuff here)// handle noError response and return;
                          }
                          return Mono.just(**stuff here);//handle error response and return
                 });


如果还有另一种方法,请把它发布在这里,谢谢您,:D

10-04 19:10