顺便说一句,我还在学习weblux。
我不知道这是否可行,或者我使用了错误的方法,但考虑到这种并行变化。
Flux<String> enablers = Flux.fromIterable(enablersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.getAMono(string, entity, element))
.sequential();
谁调用具有Webclient请求(service.getAMono)的方法
webClient.post()
.uri(url)
.headers(headers -> headers.addAll(httpHeaders))
.body(BodyInserters.fromObject(request))
.retrieve()
.bodyToMono(entity2.class);
我需要等待启动器通量的流结束并处理其中的所有响应,原因是如果其中一个给我错误或否定响应,我将不会为阻滞剂运行其他并行磁通
Flux<String> blockers = Flux.fromIterable(blockersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.callAMono(string, entity, element))
.sequential();
我虽然是关于“ zip”方法的,但是这个合并了两个响应,并不是我想要的
如果有人可以帮助我。
更新
enablers. //handle enablers response and if error return a custom Mono<response> with .reduce
如果
enablers
的句柄中没有错误,请与其他.thenMany
一起进入Flux
最佳答案
我在第一个any
中找到了有条件的flux
方法,就像这样
Flux.fromIterable(enablersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.getAMono(string, entity, element))
.sequential()
.any(element -> *stuff here)//condition
.flatMap(condition->{
if(condition.equals(Boolean.FALSE)){
return Flux.fromIterable(blockersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.callAMono(string, entity, element))
.sequential()
.reduce(**stuff here)// handle noError response and return;
}
return Mono.just(**stuff here);//handle error response and return
});
如果还有另一种方法,请把它发布在这里,谢谢您,:D