我有一个由8个元素(flux)组成的Iterable构成的Flux.fromIterable(..)
对于每种通量排放量,我想异步调用一个方法。
我尝试了各种无法使用的dispatchOnpublishOn方法,最后我选择了map(CompletableFuture.supplyAsync(..), executor)flux转换为flux<CompletableFuture<Boolean>>

现在,我只想在最后一项完成时继续执行流程。
我尝试使用all(..)take(size of the Iterable),但是在两种情况下,流程在所有元素完成之前都将继续进行。
我认为这是因为我的执行器只有4个线程,并且将CompletableFuture -s添加到流量中需要花费一些时间。

为什么不all(..)take(8)等待助焊剂完成?
我该如何等待?

编码:

    Mono
    .fromFuture(dbUtil.getEntity(id))
    .doOnError(t -> {
        ...
        return;})
    .doOnSuccess(s -> log.info("Got it: " + s))
    .flatMap( s ->
        Flux.fromIterable(s.getItemsMap().entrySet())
            .map( e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
            .take(s.getItemsMap().entrySet().size())
    )
    .all(...)
    .consume(b -> done(b));

最佳答案

如果要与每个项目的计算并行进行,可以使用flatMap + just + subscribeOn + map。我对Reactor的调度程序类型不太熟悉,因此我将在RxJava中为您提供一个示例:

ExecutorService exec = ...
Scheduler scheduler = Schedulers.from(exec);

Observable.from(...)
.flatMap(e -> Observable.just(e).subscribeOn(scheduler).map(v -> process(v)))
.subscribe(...);

关于java - flux.take(x)或flux.all(..)不会等待所有排放,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36166339/

10-14 09:54