考虑以下代码:
AtomicInteger counter1 = new AtomicInteger();
AtomicInteger counter2 = new AtomicInteger();
Flux<Object> source = Flux.generate(emitter -> {
emitter.next("item");
});
Executor executor1 = Executors.newFixedThreadPool(32);
Executor executor2 = Executors.newFixedThreadPool(32);
Flux<String> flux1 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(1);
return "1_" + counter1.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));
Flux<String> flux2 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(100);
return "2_" + counter2.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor2)));
Flux.merge(flux1, flux2).subscribe(System.out::println);
您会看到一个发布者比另一个发布者快100倍。尽管如此,在运行代码时,似乎所有数据都已打印,但是两个发布者之间仍然存在巨大差距,这会增加超时时间。
有趣的是,当更改数字时,
executer2
将具有1024
线程,而executer1
将仅具有1
线程,那么我们仍然看到差距越来越大。我的期望是,在相应地调整了线程池之后,发布者将变得平衡。
我想在发布者之间取得平衡(相对于线程池大小和处理时间)
如果我等了足够长的时间会怎样?换句话说,会产生背压吗? (默认情况下,我猜这是运行时异常,对吗?)
我不想删除项目,也不想有运行时异常。相反,正如我所提到的,我希望系统在其资源和处理时间方面达到平衡-上面的代码是否可以保证这一点?
谢谢!
最佳答案
在此示例中,您的Flux
对象不是ParallelFlux
对象,因此它们只能使用一个线程。
创建一个能够处理数千个线程的调度程序并将其传递给Flux
对象之一并不重要-它们只是闲置在那儿,这正是本示例中正在发生的事情。没有背压,也不会导致异常-它的运行速度与使用一个线程一样快。
如果要确保Flux
充分利用了它可用的1024个线程,则需要调用.parallel(1024)
:
ParallelFlux<String> flux1 = Flux.merge(source).parallel(1).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(1);
return "1_" + counter1.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));
ParallelFlux<String> flux2 = Flux.merge(source).parallel(1024).concatMap(item -> Mono.fromCallable(() -> {
Thread.sleep(100);
return "2_" + counter2.incrementAndGet();
}).subscribeOn(Schedulers.fromExecutor(executor1)));
如果对代码执行此操作,则
2_
航行经过1_
,尽管实际上它的睡眠时间是100倍,但您开始看到的结果与预期的结果非常接近:...
2_17075
2_17076
1_863
1_864
2_17077
1_865
2_17078
2_17079
...
但是,请注意:
我想在发布者之间取得平衡(相对于线程池大小和处理时间)
至少不能可靠地或以任何有意义的方式,您不能在此处选择数字来平衡输出-线程调度将完全是任意的。如果要这样做,则可以使用this variant of the subscribe method,从而允许您在订阅使用者上显式调用
request()
。这样,您可以通过仅请求准备处理的元素来提供背压。