我在相同的流量上使用publishOn vs subscriptionOn,如下所示:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
虽然,当我同时使用两者时,日志中没有任何内容。
但是,当我只使用publishOn时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
是否建议比publishOn更好地使用publishOn?还是它比subscribeOn有更多的首选项?两者之间的区别是什么?何时使用?
最佳答案
我花了一些时间来理解它,也许是因为publishOn
通常在subscribeOn
之前解释,所以希望这是一个更简单的外行解释。subscribeOn
意味着在指定的调度程序工作程序(其他线程)上运行初始源发射,例如subscribe(), onSubscribe() and request()
,并且对于任何后续操作(例如onNext/onError/onComplete, map etc
)也是如此,无论subscribeOn()的位置如何,都会发生此行为
而且,如果您在流畅的调用中没有执行任何publishOn
,那么就是这样,所有操作都将在该线程上运行。
但是,只要您在中间调用publishOn()
,就可以立即在提供的调度程序工作程序上对此类publishOn()
运行任何后续的运算符调用。
这是一个例子
Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());
Flux.range(1, 5)
.doOnNext(consumer)
.map(i -> {
System.out.println("Inside map the thread is " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
.doOnNext(consumer)
.publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
.doOnNext(consumer)
.subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
.subscribe();
结果将是
1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5
如您所见,第一个
doOnNext()
和随后的map()
在名为subscribeOn_thread
的线程上运行,直到调用任何publishOn()
为止,然后任何后续调用都将在提供的调度程序上运行到该publishOn()
,并且对于任何后续调用都将再次发生,直到有人调用另一个publishOn()
。