基于阅读此问题:What's the difference between SubscribeOn and ObserveOnObserveOn设置执行​​Subscribe处理程序中代码的位置:stream.Subscribe(_ => { // this code here });SubscribeOn方法设置在哪个线程上完成流的设置。
我被领会到,如果未明确设置这些参数,则使用TaskPool。
现在我的问题是,让我说这样的事情:

Observable.Interval(new Timespan(0, 0, 1))
          .Where(t => predicate(t))
          .SelectMany(t => lots_of(t))
          .ObserveOnDispatcher()
          .Subscribe(t => some_action(t));
给定Where在调度程序上执行,那么在哪里执行predicate SelectManylots_of some_action

最佳答案

关于SubscribeOnObserveOn的信息有很多误导。

概要

  • SubscribeOn 拦截对的唯一方法IObservable<T> 的调用,即Subscribe,并在Dispose返回的IDisposable句柄上调用Subscribe
  • ObserveOn 拦截对 IObserver<T> 方法的调用,这些方法是OnNextOnCompletedOnError
  • 这两种方法都会导致在指定的调度程序上进行相应的调用。

  • 分析与示范

    该声明



    比帮助更令人困惑。您所说的“订阅处理程序”实际上是一个OnNext处理程序。请记住,SubscribeIObservable方法接受具有IObserverOnNextOnCompleted方法的OnError,但是扩展方法提供了方便的重载,它们接受lambda并为您构建了IObserver实现。

    不过,让我来用这个词吧;我认为“订阅处理程序”是在调用Subscribe时调用的可观察代码。这样,上面的描述更加类似于SubscribeOn的目的。

    SubscribeOn
    SubscribeOn导致可观察对象的Subscribe方法在指定的调度程序或上下文上异步执行。当您不想在正在运行的任何线程上的可观察对象上调用Subscribe方法时,可以使用它-通常是因为它可以长时间运行并且您不想阻塞调用线程。

    调用Subscribe时,您正在调用一个可观察对象,它可能是一连串可观察对象的一部分。只能观察到SubscribeOn对其起作用。现在可能是链中的所有可观察对象将立即在同一线程上被订阅-并非必须如此。例如,考虑Concat-仅在前一个流完成后才订阅每个连续流,通常,这将在前一个流称为OnCompleted的任何线程上进行。

    因此,SubscribeOn位于您对Subscribe的调用与您所订阅的可观察对象之间,以拦截该调用并使它异步。

    它还影响订阅的处置。 Subscribe返回用于取消订阅的IDisposable句柄。 SubscribeOn确保在提供的调度程序上调度对Dispose的调用。

    试图了解SubscribeOn的功能时,一个常见的混淆点是,可观察对象的Subscribe处理程序很可能在同一线程上调用OnNextOnCompletedOnError。但是,其目的不是影响这些调用。在Subscribe方法返回之前完成流的情况并不少见。例如,Observable.Return可以做到这一点。让我们来看看。

    如果使用我编写的Spy方法,并运行以下代码:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Return(1).Spy("Return");
    source.Subscribe();
    Console.WriteLine("Subscribe returned");
    

    您得到以下输出(线程ID当然可能有所不同):
    Calling from Thread: 1
    Return: Observable obtained on Thread: 1
    Return: Subscribed to on Thread: 1
    Return: OnNext(1) on Thread: 1
    Return: OnCompleted() on Thread: 1
    Return: Subscription completed.
    Subscribe returned
    

    您可以看到整个订阅处理程序在同一线程上运行,并在返回之前完成。

    让我们使用SubscribeOn异步运行它。我们将同时观察ReturnSubscribeOn:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Return(1).Spy("Return");
    source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
    Console.WriteLine("Subscribe returned");
    

    输出(我添加的行号):
    01 Calling from Thread: 1
    02 Return: Observable obtained on Thread: 1
    03 SubscribeOn: Observable obtained on Thread: 1
    04 SubscribeOn: Subscribed to on Thread: 1
    05 SubscribeOn: Subscription completed.
    06 Subscribe returned
    07 Return: Subscribed to on Thread: 2
    08 Return: OnNext(1) on Thread: 2
    09 SubscribeOn: OnNext(1) on Thread: 2
    10 Return: OnCompleted() on Thread: 2
    11 SubscribeOn: OnCompleted() on Thread: 2
    12 Return: Subscription completed.
    

    01-主要方法在线程1上运行。

    02-可观察到的Return在调用线程上进行评估。我们只是在这里获取IObservable,尚未订阅任何内容。

    03-在调用线程上评估SubscribeOn observable。

    04-现在,我们最终将SubscribeSubscribeOn方法调用。

    05-Subscribe方法异步完成...

    06-...,线程1返回到main方法。 这就是SubscribeOn的作用!

    07-同时,SubscribeOn在默认调度程序上调度了对Return的调用。在这里它在线程2上被接收。

    08-和Return一样,它在OnNext线程上调用Subscribe ...

    09-SubscribeOn现在只是一个传递。

    10,11-与OnCompleted相同

    12-最后,完成Return订阅处理程序。

    希望这可以清除SubscribeOn的目的和效果!

    观察

    如果您认为SubscribeOn是将调用传递到另一个线程的Subscribe方法的拦截器,则ObserveOn会执行相同的工作,但对于OnNextOnCompletedOnError调用。

    回想一下我们原来的例子:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Return(1).Spy("Return");
    source.Subscribe();
    Console.WriteLine("Subscribe returned");
    

    给出了以下输出:
    Calling from Thread: 1
    Return: Observable obtained on Thread: 1
    Return: Subscribed to on Thread: 1
    Return: OnNext(1) on Thread: 1
    Return: OnCompleted() on Thread: 1
    Return: Subscription completed.
    Subscribe returned
    

    现在让我们修改它以使用ObserveOn:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Return(1).Spy("Return");
    source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
    Console.WriteLine("Subscribe returned");
    

    我们得到以下输出:
    01 Calling from Thread: 1
    02 Return: Observable obtained on Thread: 1
    03 ObserveOn: Observable obtained on Thread: 1
    04 ObserveOn: Subscribed to on Thread: 1
    05 Return: Subscribed to on Thread: 1
    06 Return: OnNext(1) on Thread: 1
    07 ObserveOn: OnNext(1) on Thread: 2
    08 Return: OnCompleted() on Thread: 1
    09 Return: Subscription completed.
    10 ObserveOn: Subscription completed.
    11 Subscribe returned
    12 ObserveOn: OnCompleted() on Thread: 2
    

    01-主要方法在线程1上运行。

    02-和以前一样,可观察到的Return在调用线程上进行评估。我们只是在这里获取IObservable,尚未订阅任何内容。

    03-可观察到的ObserveOn也在调用线程上评估。

    04-现在我们再次在调用线程上订阅可观察到的ObserveOn ...

    05-...然后将 call 传递到可观察到的Return

    06-现在Return在其OnNext处理程序中调用Subscribe

    07-这是ObserveOn的效果。 我们可以看到OnNext是在线程2上异步调度的。

    08-同时Return在线程1上调用OnCompleted ...

    09-Return的订阅处理程序完成...

    10-然后ObserveOn的订阅处理程序也是如此...

    11-将控制权返回给main方法

    12-同时,ObserveOnReturnOnCompleted调用传递给了线程2。这可能在09-11期间的任何时间发生,因为它是异步运行的。碰巧的是,它现在终于被调用了。

    典型的用例是什么?

    当您需要将SubscribeOn转换为长时间运行的可观察对象并希望尽快离开调度程序线程时,通常会在GUI中看到Subscribe的使用-也许是因为您知道这是在订阅中完成所有工作的那些可观察对象之一处理程序。在可观察链的末尾应用它,因为这是您订阅时调用的第一个可观察物。

    当您要确保将ObserveOnOnNextOnCompleted调用编码回调度程序线程时,最经常会在GUI中看到OnError。在可观察链的末尾应用它,以尽可能晚地过渡回来。

    希望您能看到问题的答案是,ObserveOnDispatcher不会对执行WhereSelectMany的线程有任何影响-这完全取决于调用它们的线程流! stream的订阅处理程序将在调用线程上调用,但是如果不知道Where的实现方式,就无法说出SelectManystream的运行位置。

    生命周期超过订阅调用的可观察值

    到目前为止,我们一直只关注Observable.ReturnReturnSubscribe处理程序中完成其流。这不是典型的情况,但流超过Subscribe处理程序也很普遍。例如,查看Observable.Timer:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
    source.Subscribe();
    Console.WriteLine("Subscribe returned");
    

    这将返回以下内容:
    Calling from Thread: 1
    Timer: Observable obtained on Thread: 1
    Timer: Subscribed to on Thread: 1
    Timer: Subscription completed.
    Subscribe returned
    Timer: OnNext(0) on Thread: 2
    Timer: OnCompleted() on Thread: 2
    

    您可以清楚地看到订阅已完成,然后稍后在另一个线程上调用OnNextOnCompleted

    请注意,SubscribeOnObserveOn的组合不会对选择哪个线程或调度程序Timer对其调用OnNextOnCompleted产生任何影响。

    当然,您可以使用SubscribeOn来确定Subscribe线程:
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
    source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
    Console.WriteLine("Subscribe returned");
    

    (我故意在这里更改为NewThreadScheduler,以防止在Timer碰巧获得与oj​​it_code相同的线程池线程的情况下的混淆)

    给予:
    Calling from Thread: 1
    Timer: Observable obtained on Thread: 1
    SubscribeOn: Observable obtained on Thread: 1
    SubscribeOn: Subscribed to on Thread: 1
    SubscribeOn: Subscription completed.
    Subscribe returned
    Timer: Subscribed to on Thread: 2
    Timer: Subscription completed.
    Timer: OnNext(0) on Thread: 3
    SubscribeOn: OnNext(0) on Thread: 3
    Timer: OnCompleted() on Thread: 3
    SubscribeOn: OnCompleted() on Thread: 3
    

    在这里,您可以清楚地看到线程(1)上的主线程在其SubscribeOn调用之后返回,但是Subscribe订阅获得了自己的线程(2),但是TimerOnNext调用在线程(3)上运行。

    现在,对于OnCompleted,让我们将代码更改为(对于代码中的后续代码,请使用nuget包rx-wpf):
    var dispatcher = Dispatcher.CurrentDispatcher;
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
    source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
    Console.WriteLine("Subscribe returned");
    

    这段代码有些不同。第一行确保我们有一个调度程序,我们还引入了ObserveOn,就像ObserveOnDispatcher一样,不同之处在于它指定我们应使用对ObserveOn进行评估的任何线程的DispatcherScheduler

    此代码提供以下输出:
    Calling from Thread: 1
    Timer: Observable obtained on Thread: 1
    ObserveOn: Observable obtained on Thread: 1
    ObserveOn: Subscribed to on Thread: 1
    Timer: Subscribed to on Thread: 1
    Timer: Subscription completed.
    ObserveOn: Subscription completed.
    Subscribe returned
    Timer: OnNext(0) on Thread: 2
    ObserveOn: OnNext(0) on Thread: 1
    Timer: OnCompleted() on Thread: 2
    ObserveOn: OnCompleted() on Thread: 1
    

    请注意,调度程序(和主线程)是线程1。ObserveOnDispatcher仍在其选择的线程(2)上调用TimerOnNext-但是OnCompleted将编码回调用程序的调度程序线程,线程(1)。

    还要注意,如果我们要阻塞调度程序线程(用ObserveOnDispatcher表示),您会看到Thread.Sleep将阻塞(此代码在LINQPad main方法中效果最好):
    var dispatcher = Dispatcher.CurrentDispatcher;
    Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
    var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
    source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
    Console.WriteLine("Subscribe returned");
    Console.WriteLine("Blocking the dispatcher");
    Thread.Sleep(2000);
    Console.WriteLine("Unblocked");
    

    您将看到如下输出:
    Calling from Thread: 1
    Timer: Observable obtained on Thread: 1
    ObserveOn: Observable obtained on Thread: 1
    ObserveOn: Subscribed to on Thread: 1
    Timer: Subscribed to on Thread: 1
    Timer: Subscription completed.
    ObserveOn: Subscription completed.
    Subscribe returned
    Blocking the dispatcher
    Timer: OnNext(0) on Thread: 2
    Timer: OnCompleted() on Thread: 2
    Unblocked
    ObserveOn: OnNext(0) on Thread: 1
    ObserveOn: OnCompleted() on Thread: 1
    

    通过ObserveOnDispatcher进行的调用仅在ObserveOnDispatcher运行后才能发出。

    关键点

    请记住,Reactive Extensions本质上是一个自由线程库,它会尽其所能在其运行的线程上尽可能地保持懒惰-您必须故意干扰SleepObserveOn并将特定的调度程序传递给接受它们的运算符,这很有用改变这个。

    Observable的使用者无法做任何事情来控制其内部操作-SubscribeOnObserveOndecorators,它们包装了观察者和可观察对象的表面积以封送跨线程的调用。希望这些例子已经清楚了。

    10-08 05:04