我在相同的流量上使用publishOn vs subscriptionOn,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");

虽然,当我同时使用两者时,日志中没有任何内容。
但是,当我只使用publishOn时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

是否建议比publishOn更好地使用publishOn?还是它比subscribeOn有更多的首选项?两者之间的区别是什么?何时使用?

最佳答案

我花了一些时间来理解它,也许是因为publishOn通常在subscribeOn之前解释,所以希望这是一个更简单的外行解释。
subscribeOn意味着在指定的调度程序工作程序(其他线程)上运行初始源发射,例如subscribe(), onSubscribe() and request(),并且对于任何后续操作(例如onNext/onError/onComplete, map etc)也是如此,无论subscribeOn()的位置如何,都会发生此行为

而且,如果您在流畅的调用中没有执行任何publishOn,那么就是这样,所有操作都将在该线程上运行。

但是,只要您在中间调用publishOn(),就可以立即在提供的调度程序工作程序上对此类publishOn()运行任何后续的运算符调用。

这是一个例子

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
        .doOnNext(consumer)
        .map(i -> {
          System.out.println("Inside map the thread is " + Thread.currentThread().getName());
          return i * 10;
        })
        .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
        .doOnNext(consumer)
        .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
        .doOnNext(consumer)
        .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
        .subscribe();

结果将是

1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5


如您所见,第一个doOnNext()和随后的map()在名为subscribeOn_thread的线程上运行,直到调用任何publishOn()为止,然后任何后续调用都将在提供的调度程序上运行到该publishOn(),并且对于任何后续调用都将再次发生,直到有人调用另一个publishOn()

10-06 05:46