本文介绍了ObserveOn和SubscribeOn - 那里的工作正在做的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 根据阅读这个问题:What's SubscribeOn之间的差和ObserveOn 的 ObserveOn 设置,其中code是在订阅处理程序执行的: stream.Subscribe(_ =&GT; {//这个code在这里}); 哪个线程流的设置上做的 SubscribeOn 方法集。我导致明白,如果这些都没有显式地设置,则TaskPool被使用。现在我的问题是,可以说我做这样的事情: Observable.Interval(新的时间跨度(0,0,1))其中。(T =&GT; predicate(T))的SelectMany(T =&GT; lots_of(T))。 。.ObserveOnDispatcher()订阅(T =&GT; some_action(T)); 在哪里其中, predicate 和的SelectMany lots_of 被执行,因为 some_action 上的调度员正在执行?解决方案 有很多误导性的信息在那里大约 SubscribeOn 和 ObserveOn 。摘要 SubscribeOn 拦截调用的唯一方法 的IObservable&LT; T&GT; ,这是订阅,并调用处置在 IDisposable的句柄订阅。 ObserveOn 拦截调用的方法 IObserver&LT; T&GT; ,它们是 OnNext , OnCompleted &放大器; 的OnError 。在这两种方法会导致要在指定的调度做了相应的要求。 分析和放大器;演示语句时的困惑多于帮助。你所提到的订阅处理器是一个真正的 OnNext 处理程序。记住,订阅的方法的IObservable 接受一个 IObserver 的有 OnNext , OnCompleted 和的OnError 方法,但它是提供了方便的重载接受lambda表达式,并建立一个 IObserver 实现你的扩展方法。让我适当的术语虽然;我认为订阅处理器作为code的中观察到的的被调用时,订阅被称为。以这种方式,以上描述更像的目的 SubscribeOn SubscribeOn SubscribeOn 引起要在指定的调度程序或上下文异步执行的可观察的订阅方法。您可以使用它时,你不想叫订阅从任何线程在运行上可观察到的方法 - 通常是因为它可以长时间运行而你不知道吨要阻塞调用线程。当你调用订阅,你调用一个可观察的,可能是一个长期的观测值链的一部分。这是唯一可观测的 SubscribeOn 被应用到它的效果。现在它可能是所有链中的观测将被立即并在同一线程订阅的情况下 - 但它不必是这种情况。想想 Concat的例如 - 只订阅了每个连续的数据流一旦preceding流已经完成,通常这会发生什么线程preceding上流中调用 OnCompleted 从。所以 SubscribeOn 您的通话之间坐到订阅和你订阅,拦截通话,使可观测台异步它这也影响了处置订阅。 订阅返回的IDisposable 处理这是用来退订。 SubscribeOn 确保呼叫处置计划所提供的调度。混乱试图理解当一个共通点是什么 SubscribeOn 确实是可观察的订阅处理程序可能调用 OnNext , OnCompleted 或的OnError 在此同一线程。然而,它的目的不是要影响这些调用。这是很平常的流来完成在订阅方法返回。 Observable.Return 做到这一点,例如。让我们一起来看看。如果您使用的Spy方法我写的,并运行以下code: Console.WriteLine(从线程中调用:+ Thread.CurrentThread.ManagedThreadId);无功源= Observable.Return(1).Spy(返回);source.Subscribe();Console.WriteLine(订阅返回); 您得到这个输出(当然线程ID可能会有所不同):从螺纹 电话:1返回:对线程观测获得:1返回:已订阅的主题:1返回:OnNext(1)主题:1返程:OnCompleted()上的主题:1返回:认购完成。订阅返回 您可以看到,整个预订处理器运行在同一个线程,并在返回之前完成。让我们用 SubscribeOn 异步运行这一点。我们将刺探的收益可观察和SubscribeOn观察到的两个: Console.WriteLine(从线程中调用:+ Thread.CurrentThread.ManagedThreadId);无功源= Observable.Return(1).Spy(返回);source.SubscribeOn(Scheduler.Default).Spy(SubscribeOn)订阅()。Console.WriteLine(订阅返回); 这个输出(添加由我行号): 01的线程中调用:102回归 - 线程观测获得:103 SubscribeOn:在线程观测获得:104 SubscribeOn:上订阅了主题:105 SubscribeOn:认购完成。06订阅返回07回:上订阅了主题:208返回:OnNext(1)主题:209 SubscribeOn:OnNext(1)主题:210返回:OnCompleted()在主题:211 SubscribeOn:OnCompleted()在主题:212返回:认购完成。 01 - 的主要方法是在线程1运行 02 - 返回观察到的调用线程进行评估。我们刚开的IObservable 在这里,没有什么是订阅呢。 03 - 的SubscribeOn观察到的被评为调用线程。 04 - 现在最后我们称之为订阅 SubscribeOn方法 05 - 的订阅方法异步完成... 06 - ...和线程1返回到主法。 这是SubscribeOn在行动的效果! 07 - 同时, SubscribeOn 定呼叫上的默认调度为返回。在这里,它收到了线程2。 08 - 正如返回呢,它会调用 OnNext 的订阅线程... 09 - 和 SubscribeOn 只是一个闯关现在 10,11 - 同样为 OnCompleted 12 - 而在去年所有的返回订阅处理程序完成。希望这清除了的目的和效果 SubscribeOn ! ObserveOn 如果你觉得 SubscribeOn 作为拦截器的订阅方法,该方法将调用传递到不同的线程,那么 ObserveOn 做同样的工作,但对于 OnNext , OnCompleted 和的OnError 调用。回想一下我们最初的例子: Console.WriteLine(从线程中调用:+ Thread.CurrentThread.ManagedThreadId);无功源= Observable.Return(1).Spy(返回);source.Subscribe();Console.WriteLine(订阅返回); 这给了这样的输出:从螺纹 电话:1返回:对线程观测获得:1返回:已订阅的主题:1返回:OnNext(1)主题:1返程:OnCompleted()上的主题:1返回:认购完成。订阅返回 现在,让我们改变这种使用 ObserveOn : Console.WriteLine(从线程中调用:+ Thread.CurrentThread.ManagedThreadId);无功源= Observable.Return(1).Spy(返回);source.ObserveOn(Scheduler.Default).Spy(ObserveOn)订阅()。Console.WriteLine(订阅返回); 我们得到以下的输出: 01的线程中调用:102回归 - 线程观测获得:103 ObserveOn:在线程观测获得:104 ObserveOn:上订阅了主题:105返回:已订阅的主题:106返回:OnNext(1)主题:107 ObserveOn:OnNext(1)主题:208返回:OnCompleted()上的主题:109返回:认购完成。10 ObserveOn:认购完成。11订阅返回12 ObserveOn:OnCompleted()在主题:2 01 - 的主要方法是在线程1运行 02 - 和以前一样,返回观察到的被评为调用线程。我们刚开的IObservable 在这里,没有什么是订阅呢。 03 - 的 ObserveOn 观察到的是调用线程评价过 04 - 现在我们订阅,再调用线程上,第一至ObserveOn观察到的... 05 - ...。然后通过对通呼叫返回观察到 06 - 现在返回电话 OnNext 在它的订阅处理程序 07 - 这是 ObserveOn 的效果我们可以看到, OnNext 是异步定于线程2。 08 - 同时返回电话 OnCompleted 在线程1 ... 09 - 和返回的申购处理程序完成... 10 - 然后也是如此 ObserveOn 的订阅处理器... 11 - 所以控制返回到主方法 12 - 同时, ObserveOn 已穿梭返回的 OnCompleted 称之为过,因为它是异步运行在09-11主题2。这可能发生在任何时间。碰巧它终于现在叫。什么是典型的用例?您会经常看到 SubscribeOn 在GUI中使用,当你需要订阅来一个长期运行的观察到的,要下车调度线程尽快 - 也许是因为你知道这是那些观测,做在订阅处理一切的工作之一。在观察到链的末端应用它,因为当你订购这是第一次观察到的调用。您会经常看到 ObserveOn 在GUI中使用时,要确保 OnNext , OnCompleted 和的OnError 调用都会返回到调度线程。将它应用在可观测链的末端,以转换回尽可能晚。希望你可以看到,回答你的问题是, ObserveOnDispatcher 不会做出任何区别了其中,和的SelectMany 正在上执行 - 这一切都取决于什么线程的流的是由调用它们!流的申购处理程序将被调用线程上调用,但很难说其中其中,和的SelectMany 将运行不知道如何流的实施。观测量与活得比的订阅拨打寿命到现在为止,我们一直在寻找独家 Observable.Return 。返回完成它的订阅处理程序内流。这不是典型的,但它也同样常见的流活得比的认购处理程序。看看 Observable.Timer 例如: Console.WriteLine(从线程中调用:+ Thread.CurrentThread.ManagedThreadId);无功源= Observable.Timer(TimeSpan.FromSeconds(1))间谍(定时器)。source.Subscribe();Console.WriteLine(订阅返回); 这将返回以下内容:从螺纹 电话:1定时器:在线程观测获得:1计时器:上订阅了主题:1定时器:认购完成。订阅返回计时器:OnNext(0)上螺纹:2计时器:OnCompleted()在主题:2 您可以清楚地看到认购完成后再 OnNext 和 OnCompleted 被在不同的线程后来被称为请注意,这不结合 SubscribeOn 或 ObserveOn 将拥有的任何影响任何的哪个线程或调度定时器据法权产调用 OnNext 和 OnCompleted 在当然,你可以使用 SubscribeOn 确定认购螺纹: Console.WriteLine(从线程中调用:+ Thread.CurrentThread.ManagedThreadId);无功源= Observable.Timer(TimeSpan.FromSeconds(1))间谍(定时器)。source.SubscribeOn(NewThreadScheduler.Default).Spy(SubscribeOn)订阅()。Console.WriteLine(订阅返回); (我故意更改为 NewThreadScheduler 在定时器发生的情况下,这里prevent混乱得到相同的线程池线程的 SubscribeOn )捐赠:从螺纹 电话:1定时器:在线程观测获得:1SubscribeOn:在线程观测获得:1SubscribeOn:上订阅了主题:1SubscribeOn:认购完成。订阅返回计时器:上订阅了主题:2定时器:认购完成。计时器:OnNext(0)的主题:3SubscribeOn:OnNext(0)的主题:3计时器:OnCompleted()的主题:3SubscribeOn:OnCompleted()的主题:3 在这里,你可以清楚地看到线程的主线程(1)回国后的订阅通话,但定时器订阅得到它自己的线程(2),但 OnNext 和 OnCompleted 呼吁线程上运行(3)。现在的 ObserveOn ,让我们改变了code到(对于那些下沿code,使用的NuGet包RX-WPF): VAR调度= Dispatcher.CurrentDispatcher;Console.WriteLine(从线程中调用:+ Thread.CurrentThread.ManagedThreadId);无功源= Observable.Timer(TimeSpan.FromSeconds(1))间谍(定时器)。。source.ObserveOnDispatcher()间谍(ObserveOn)订阅()。Console.WriteLine(订阅返回); 这code是一个有点不同。第一线,确保我们有一个调度员,我们也将在 ObserveOnDispatcher - 这就像 ObserveOn ,除了它指定要使用 DispatcherScheduler 的任意线程的 ObserveOnDispatcher 上的评价。这code给出了下面的输出:从螺纹 电话:1定时器:在线程观测获得:1ObserveOn:在线程观测获得:1ObserveOn:上订阅了主题:1计时器:上订阅了主题:1定时器:认购完成。ObserveOn:认购完成。订阅返回计时器:OnNext(0)上螺纹:2ObserveOn:OnNext(0)的主题:1计时器:OnCompleted()在主题:2ObserveOn:OnCompleted()上的主题:1 请注意,调度器(和主线程)是线程1 定时器仍呼吁 OnNext 和 OnCompleted 其选择的线程(2) - 但 ObserveOnDispatcher 将编组调用返回到调度线程,线程(1 )。另外请注意,如果我们要阻止调度线程(被说 Thread.sleep代码),你会看到ObserveOnDispatcher会阻塞(这code ++工程最好的一个LINQPad主要方法内): VAR调度= Dispatcher.CurrentDispatcher;Console.WriteLine(从线程中调用:+ Thread.CurrentThread.ManagedThreadId);无功源= Observable.Timer(TimeSpan.FromSeconds(1))间谍(定时器)。。source.ObserveOnDispatcher()间谍(ObserveOn)订阅()。Console.WriteLine(订阅返回);Console.WriteLine(阻断调度员);Thread.sleep代码(2000);Console.WriteLine(畅通); 你会看到类似这样的输出:从螺纹 电话:1定时器:在线程观测获得:1ObserveOn:在线程观测获得:1ObserveOn:上订阅了主题:1计时器:上订阅了主题:1定时器:认购完成。ObserveOn:认购完成。订阅返回阻塞调度计时器:OnNext(0)上螺纹:2计时器:OnCompleted()在主题:2畅通ObserveOn:OnNext(0)的主题:1ObserveOn:OnCompleted()上的主题:1 通过通过调用 ObserveOnDispatcher 只能够走出一旦睡眠已运行。要点这是很有用的记住,无扩展本质上是一个自由线程库,并试图尽可能懒惰地了解哪些线程运行的 - 你必须要刻意与 ObserveOn , SubscribeOn ,并通过特定的调度来接受他们的运营商来改变这一点。没有什么消费者可观察的可以做来控制它的内部做 - ObserveOn 和 SubscribeOn 是装饰的包装观察员和观测的表面积元帅跨线程调用。希望这些例子已经说的很清楚。Based on reading this question: What's the difference between SubscribeOn and ObserveOnObserveOn sets where the code is in the Subscribe handler is is executed:stream.Subscribe(_ => { // this code here });The SubscribeOn method sets which thread the setup of the stream is done on.I'm led to understand that if these aren't explicitly set, then the TaskPool is used.Now my question is, lets say I do something like this:Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));Where are the Where predicate and SelectMany lots_of being executed, given that some_action is being executed on the dispatcher? 解决方案 There's a lot of misleading info out there about SubscribeOn and ObserveOn.SummarySubscribeOn intercepts calls to the single method of IObservable<T>, which is Subscribe, and calls to Dispose on the IDisposable handle returned by Subscribe.ObserveOn intercepts calls to the methods of IObserver<T>, which are OnNext, OnCompleted & OnError.Both methods cause the respective calls to be made on the specified scheduler. Analysis & DemonstrationsThe statement is more confusing than helpful. What you are referring to as the "Subscribe handler" is really an OnNext handler. Remember, the Subscribe method of IObservable accepts an IObserver that has OnNext, OnCompleted and OnError methods, but it is extension methods that provide the convenience overloads that accept lambdas and build an IObserver implementation for you.Let me appropriate the term though; I think of the "Subscribe handler" being the code in the observable that is invoked when Subscribe is called. In this way, the description above more closely resembles the purpose of SubscribeOn.SubscribeOnSubscribeOn causes the Subscribe method of an observable to be executed asynchronously on the specified scheduler or context. You use it when you don't want to call the Subscribe method on an observable from whatever thread you are running on - typically because it can be long-running and you don't want to block the calling thread.When you call Subscribe, you are calling an observable that may be part of a long chain of observables. It's only the observable that SubscribeOn is applied to that it effects. Now it may be the case that the all the observables in the chain will be subscribed to immediately and on the same thread - but it doesn't have to be the case. Think about Concat for example - that only subscribes to each successive stream once the preceding stream has finished, and typically this will take place on whatever thread the preceding stream called OnCompleted from.So SubscribeOn sits between your call to Subscribe and the observable you are subscribing to, intercepting the call and making it asychronous.It also affects disposal of subscriptions. Subscribe returns an IDisposable handle which is used to unsubscribe. SubscribeOn ensures calls to Dispose are scheduled on the supplied scheduler.A common point of confusion when trying to understand what SubscribeOn does is that the Subscribe handler of an observable may well call OnNext, OnCompleted or OnError on this same thread. However, it's purpose is not to affect these calls. It's not uncommon for a stream to complete before the Subscribe method returns. Observable.Return does this, for example. Let's take a look.If you use the Spy method I wrote, and run the following code:Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Return(1).Spy("Return");source.Subscribe();Console.WriteLine("Subscribe returned");You get this output (thread id may vary of course):Calling from Thread: 1Return: Observable obtained on Thread: 1Return: Subscribed to on Thread: 1Return: OnNext(1) on Thread: 1Return: OnCompleted() on Thread: 1Return: Subscription completed.Subscribe returnedYou can see that the entire subscription handler ran on the same thread, and finished before returning.Let's use SubscribeOn to run this asynchronously. We will Spy on both the Return observable and the SubscribeOn observable:Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Return(1).Spy("Return");source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();Console.WriteLine("Subscribe returned");This outputs (line numbers added by me):01 Calling from Thread: 102 Return: Observable obtained on Thread: 103 SubscribeOn: Observable obtained on Thread: 104 SubscribeOn: Subscribed to on Thread: 105 SubscribeOn: Subscription completed.06 Subscribe returned07 Return: Subscribed to on Thread: 208 Return: OnNext(1) on Thread: 209 SubscribeOn: OnNext(1) on Thread: 210 Return: OnCompleted() on Thread: 211 SubscribeOn: OnCompleted() on Thread: 212 Return: Subscription completed.01 - The main method is running on thread 1.02 - the Return observable is evaluated on the calling thread. We're just getting the IObservable here, nothing is subscribing yet.03 - the SubscribeOn observable is evaluated on the calling thread.04 - Now finally we call the Subscribe method of SubscribeOn.05 - The Subscribe method completes asynchronously...06 - ... and thread 1 returns to the main method. This is the effect of SubscribeOn in action!07 - Meanwhile, SubscribeOn scheduled a call on the default scheduler to Return. Here it is received on thread 2.08 - And as Return does, it calls OnNext on the subscribe thread...09 - and SubscribeOn is just a pass through now.10,11 - Same for OnCompleted12 - And last of all the Return subscription handler is done.Hopefully that clears up the purpose and effect of SubscribeOn!ObserveOnIf you think of SubscribeOn as an interceptor for the Subscribe method that passes the call on to a different thread, then ObserveOn does the same job, but for the OnNext, OnCompleted and OnError calls.Recall our original example:Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Return(1).Spy("Return");source.Subscribe();Console.WriteLine("Subscribe returned");Which gave this output:Calling from Thread: 1Return: Observable obtained on Thread: 1Return: Subscribed to on Thread: 1Return: OnNext(1) on Thread: 1Return: OnCompleted() on Thread: 1Return: Subscription completed.Subscribe returnedNow lets alter this to use ObserveOn:Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Return(1).Spy("Return");source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();Console.WriteLine("Subscribe returned");We get the following output:01 Calling from Thread: 102 Return: Observable obtained on Thread: 103 ObserveOn: Observable obtained on Thread: 104 ObserveOn: Subscribed to on Thread: 105 Return: Subscribed to on Thread: 106 Return: OnNext(1) on Thread: 107 ObserveOn: OnNext(1) on Thread: 208 Return: OnCompleted() on Thread: 109 Return: Subscription completed.10 ObserveOn: Subscription completed.11 Subscribe returned12 ObserveOn: OnCompleted() on Thread: 201 - The main method is running on Thread 102 - As before, the Return observable is evaluated on the calling thread. We're just getting the IObservable here, nothing is subscribing yet.03 - The ObserveOn observable is evaluation on the calling thread too.04 - Now we subscribe, again on the calling thread, first to the ObserveOn observable...05 - ... which then passes the call through to the Return observable.06 - Now Return calls OnNext in it's Subscribe handler07 - Here is the effect of ObserveOn. We can see that the OnNext is scheduled asynchronously on Thread 2.08 - Meanwhile Return calls OnCompleted on Thread 1...09 - And Return's subscription handler completes...10 - and then so does ObserveOn's subscription handler...11 - so control is returned to the main method12 - Meanwhile, ObserveOn has shuttled Return's OnCompleted call this over to Thread 2. This could have happened at any time during 09-11 because it is running asynchronously. Just so happens it's finally called now.What are the typical use cases?You will most often see SubscribeOn used in a GUI when you need to Subscribe to a long running observable and want to get off the dispatcher thread as soon as possible - maybe because you know it's one of those observables that does all it's work in the subscription handler. Apply it at the end of the observable chain, because this is the first observable called when you subscribe.You will most often see ObserveOn used in a GUI when you want to ensure OnNext, OnCompleted and OnError calls are marshalled back to the dispatcher thread. Apply it at the end of the observable chain to transition back as late as possible.Hopefully you can see that the answer to your question is that ObserveOnDispatcher won't make any difference to the threads that Where and SelectMany are executed on - it all depends what thread stream is calling them from! stream's subscription handler will be invoked on the calling thread, but it's impossible to say where Where and SelectMany will run without knowing how stream is implemented.Observables with lifetimes that outlive the Subscribe callUp until now, we've been looking exclusively at Observable.Return. Return completes it's stream within the subscribe handler. That's not atypical, but it's equally common for streams to outlive the subscribe handler. Look at Observable.Timer for example:Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");source.Subscribe();Console.WriteLine("Subscribe returned");This returns the following:Calling from Thread: 1Timer: Observable obtained on Thread: 1Timer: Subscribed to on Thread: 1Timer: Subscription completed.Subscribe returnedTimer: OnNext(0) on Thread: 2Timer: OnCompleted() on Thread: 2You can clearly see the subscription complete and then OnNext and OnCompleted being called later on a different thread.Note, that no combination of SubscribeOn or ObserveOn will have any effect whatsoever on which thread or scheduler Timer choses to invoke OnNext and OnCompleted on.Sure, you can use SubscribeOn to determine the subscribe thread:Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();Console.WriteLine("Subscribe returned");(I am deliberately changing to the NewThreadScheduler here to prevent confusion in the case of Timer happening to get the same thread pool thread as SubscribeOn)Giving:Calling from Thread: 1Timer: Observable obtained on Thread: 1SubscribeOn: Observable obtained on Thread: 1SubscribeOn: Subscribed to on Thread: 1SubscribeOn: Subscription completed.Subscribe returnedTimer: Subscribed to on Thread: 2Timer: Subscription completed.Timer: OnNext(0) on Thread: 3SubscribeOn: OnNext(0) on Thread: 3Timer: OnCompleted() on Thread: 3SubscribeOn: OnCompleted() on Thread: 3Here you can clearly see the main thread on thread (1) returning after it's Subscribe calls, but the Timer subscription getting it's own thread (2), but the OnNext and OnCompleted calls running on thread (3).Now for ObserveOn, let's change the code to (for those following along in code, use nuget package rx-wpf):var dispatcher = Dispatcher.CurrentDispatcher;Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();Console.WriteLine("Subscribe returned");This code is a little different. The first line ensures we have a dispatcher, and we also bring in ObserveOnDispatcher - this is just like ObserveOn, except it specifies we should use the DispatcherScheduler of whatever thread ObserveOnDispatcher is evaluated on.This code gives the following output:Calling from Thread: 1Timer: Observable obtained on Thread: 1ObserveOn: Observable obtained on Thread: 1ObserveOn: Subscribed to on Thread: 1Timer: Subscribed to on Thread: 1Timer: Subscription completed.ObserveOn: Subscription completed.Subscribe returnedTimer: OnNext(0) on Thread: 2ObserveOn: OnNext(0) on Thread: 1Timer: OnCompleted() on Thread: 2ObserveOn: OnCompleted() on Thread: 1Note that the dispatcher (and main thread) are thread 1. Timer is still calling OnNext and OnCompleted on the thread of its choosing (2) - but the ObserveOnDispatcher is marshalling calls back onto the dispatcher thread, thread (1).Also note that if we were to block the dispatcher thread (say by a Thread.Sleep) you would see that the ObserveOnDispatcher would block (this code works best inside a LINQPad main method):var dispatcher = Dispatcher.CurrentDispatcher;Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();Console.WriteLine("Subscribe returned");Console.WriteLine("Blocking the dispatcher");Thread.Sleep(2000);Console.WriteLine("Unblocked");And you'll see output like this:Calling from Thread: 1Timer: Observable obtained on Thread: 1ObserveOn: Observable obtained on Thread: 1ObserveOn: Subscribed to on Thread: 1Timer: Subscribed to on Thread: 1Timer: Subscription completed.ObserveOn: Subscription completed.Subscribe returnedBlocking the dispatcherTimer: OnNext(0) on Thread: 2Timer: OnCompleted() on Thread: 2UnblockedObserveOn: OnNext(0) on Thread: 1ObserveOn: OnCompleted() on Thread: 1With the calls through the ObserveOnDispatcher only able to get out once the Sleep has run.Key pointsIt's useful to keep in mind that Reactive Extensions is essentially a free-threaded library, and tries to be as lazy as possible about what thread it runs on - you have to deliberately interfere with ObserveOn, SubscribeOn and passing specific schedulers to operators that accept them to change this.There's nothing a consumer of an observable can do to control what it's doing internally - ObserveOn and SubscribeOn are decorators that wrap the surface area of observers and observables to marshal calls across threads. Hopefully these examples have made that clear. 这篇关于ObserveOn和SubscribeOn - 那里的工作正在做的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 09-26 23:43