考虑以下代码:

AtomicInteger counter1 = new AtomicInteger();
AtomicInteger counter2 = new AtomicInteger();

Flux<Object> source = Flux.generate(emitter -> {
    emitter.next("item");
});

Executor executor1 = Executors.newFixedThreadPool(32);
Executor executor2 = Executors.newFixedThreadPool(32);

Flux<String> flux1 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {
        Thread.sleep(1);
        return "1_" + counter1.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));

Flux<String> flux2 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {
    Thread.sleep(100);
    return "2_" + counter2.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor2)));

Flux.merge(flux1, flux2).subscribe(System.out::println);


您会看到一个发布者比另一个发布者快100倍。尽管如此,在运行代码时,似乎所有数据都已打印,但是两个发布者之间仍然存在巨大差距,这会增加超时时间。

有趣的是,当更改数字时,executer2将具有1024线程,而executer1将仅具有1线程,那么我们仍然看到差距越来越大。

我的期望是,在相应地调整了线程池之后,发布者将变得平衡。


我想在发布者之间取得平衡(相对于线程池大小和处理时间)
如果我等了足够长的时间会怎样?换句话说,会产生背压吗? (默认情况下,我猜这是运行时异常,对吗?)


我不想删除项目,也不想有运行时异常。相反,正如我所提到的,我希望系统在其资源和处理时间方面达到平衡-上面的代码是否可以保证这一点?

谢谢!

最佳答案

在此示例中,您的Flux对象不是ParallelFlux对象,因此它们只能使用一个线程。

创建一个能够处理数千个线程的调度程序并将其传递给Flux对象之一并不重要-它们只是闲置在那儿,这正是本示例中正在发生的事情。没有背压,也不会导致异常-它的运行速度与使用一个线程一样快。

如果要确保Flux充分利用了它可用的1024个线程,则需要调用.parallel(1024)

ParallelFlux<String> flux1 = Flux.merge(source).parallel(1).concatMap(item -> Mono.fromCallable(() -> {
    Thread.sleep(1);
    return "1_" + counter1.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));

ParallelFlux<String> flux2 = Flux.merge(source).parallel(1024).concatMap(item -> Mono.fromCallable(() -> {
    Thread.sleep(100);
    return "2_" + counter2.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));


如果对代码执行此操作,则2_航行经过1_,尽管实际上它的睡眠时间是100倍,但您开始看到的结果与预期的结果非常接近:

...
2_17075
2_17076
1_863
1_864
2_17077
1_865
2_17078
2_17079
...


但是,请注意:


  我想在发布者之间取得平衡(相对于线程池大小和处理时间)


至少不能可靠地或以任何有意义的方式,您不能在此处选择数字来平衡输出-线程调度将完全是任意的。如果要这样做,则可以使用this variant of the subscribe method,从而允许您在订阅使用者上显式调用request()。这样,您可以通过仅请求准备处理的元素来提供背压。

10-06 05:46