问题描述
我们最近从移植RX 1.11111系统RX 2.0,并发现了这个问题。我们使用EventLoopScheduler为ObserveOn是这样的:
We have recently ported the system from RX 1.11111 to RX 2.0 and discovered this problem. We use an EventLoopScheduler for ObserveOn like this:
IDisposable subscription = someSubject
.ObserveOn(m_eventLoopScheduler)
.SomeMoreRXFunctions()
.Subscribe((something)=>something)
调度设置在应用程序退出( m_eventLoopScheduler.Dispose
)。在此之前,我们处理的所有订阅的观察到的( subscription.Dispose
)。
The scheduler is disposed on application exit (m_eventLoopScheduler.Dispose
). Before that we dispose of all the subscriptions to the observable (subscription.Dispose
).
尽管如此,我们得到一个的ObjectDisposedException
中的 EventLoopScheduler.Schedule
。这是不可能捕捉到异常,因为它在RX线程起源。这几乎就像处置没有摆脱在一些队列中的所有项目。
Despite that, we are getting an ObjectDisposedException
inside the EventLoopScheduler.Schedule
. It's impossible to catch that exception because it originates in an RX thread. It's almost like the Dispose doesn't get rid of all the items in some queue.
我们试图删除调用 EventLoopScheduler.Dispose
和异常消失。但随后在 SomeMoreRXFunctions()中的代码
进行约10次以上执行,虽然所有的订阅被弃置。
We tried to remove the call to EventLoopScheduler.Dispose
and the exception disappeared. But then the code in SomeMoreRXFunctions()
was executed for about 10 more times although all the subscriptions were disposed.
有一些其他的方式来正确关闭 EventLoopScheduler
?
Is there some other way to properly close the EventLoopScheduler
?
推荐答案
(对不起,无法抗拒的双关语!)的IObservable<出T>
,几乎每个接收运营商实现的接口,只有一个重要的方法:
Some observations about subscriptions
(Sorry, couldn't resist the pun!) IObservable<out T>
, the interface implemented by almost every Rx operator, has just one vital method:
IDisposable Subscribe(IObserver<T> observer);
这纯粹是通过这种方法,它的返回值的处置,一个观察者(实施 IObserver< T>
)可确定何时订阅开始和结束
It is purely through this method and the disposal of it's return value that an observer (implementing IObserver<T>
) can determine when a subscription starts and ends.
当一个订阅,以可观察到的提出,是一部分。链的,一般的(直接或间接地),这将导致订阅进一步向上链。确切地说,如果当发生这种情况是下降到给定观测
When a subscription is made to an Observable that is part of a chain, generally (either directly or indirectly), this will result in a subscription further up the chain. Precisely if and when this happens is down to that given Observable.
在许多情况下,订阅关系接收到由订阅不是一对一的。这样的一个例子是发布(),这将只具有至多一个订阅到其源,而不管接收的订阅的数量。这是真的发布的全部意义。
In many cases, the relationship between subscriptions received to subscriptions made is not one-to-one. An example of this is Publish(), which will only have at most one subscription to its source, regardless of the number of subscriptions it receives. This is really the whole point of Publish.
在其他情况下,这种关系有一个时间方面。例如,CONCAT()将不会认购其第二流,直到第一个具有 OnCompleted()
! - 这可能是永远
In other cases, the relationship has a temporal aspect. For example, Concat() won't subscribe to its second stream until the first has OnCompleted()
- which could be never!
这是值得花一些时间在这里检查的,因为他们有一些非常相关话要说:
It's worth taking a moment here to examine the Rx Design Guidelines, as they have some very relevant things to say:
4.4。假设一个最好的努力,停止一切退订的杰出工作。当退订被称为上可观察到的订阅,可观察序列将做出最大努力,设法停止所有
出色的工作。这意味着,任何排队尚未启动将不会开始工作。
所有的工作,已经在进行中可能还是完整的,因为它并不总是安全的中止的工作,正在进行中。这项工作的结果也不会发生任何之前订阅的观察者实例。
Any work that is already in progress might still complete as it is not always safe to abort work that is in progress. Results from this work will not be signalled to any previously subscribed observer instances.
请注意这里的影响;底线是,它的的完全降低到可观察的实施,当任何上游订阅可能提出或处理的。换句话说,有绝对没有保证的订阅处理将导致一个观测处置任何或所有已直接或间接作出的订阅。这也适用于由操作员使用的任何其他资源(如调度操作),或者它的上游订阅。
The bottom line
Note the implications here; the bottom line is that it's entirely down to the implementation of an Observable when any upstream subscriptions might be made or disposed. In other words, there is absolutely no guarantee that disposing of subscriptions will cause an Observable to dispose any or all of the subscriptions it has either made directly or indirectly. And that goes for any other resources (such as scheduled actions) used by the operator or it's upstream subscriptions.
您所能期待的最好结果就是每个的上游运营商确实已经做出了最大努力,停止所有未完成的工作。
The best you can hope for is that the author of every upstream operator has indeed made a best effort to stop all outstanding work.
在没有看到 SomeMoreRXFunctions
的内容我不能肯定,但似乎很有可能,你所看到的例外是因为正在引起 - 尽管你处置了解订阅 - 通过配置调度您从仍在运行订阅的双脚下撕开了地毯。实际上,你造成的:
Without seeing the content of SomeMoreRXFunctions
I can't be certain, but it seems highly likely that the exception you are seeing is being caused because - in spite of disposing the subscriptions you know about - by disposing the scheduler you have ripped the rug from under the feet of still running subscriptions. Effectively, you are causing this:
void Main()
{
var scheduler = new EventLoopScheduler();
// Decide it's time to stop
scheduler.Dispose();
// The next line will throw an ObjectDisposedException
scheduler.Schedule(() => {});
}
它容易写一个完全合理的运营商可能会导致这样的问题 - 甚至一不直接使用调度!试想一下:
It's easy to write a perfectly reasonable operator that can cause this problem - even one that doesn't directly use a scheduler! Consider this:
public static class ObservableExtensions
{
public static IObservable<TSource> ReasonableDelay<TSource, TDelay>
(this IObservable<TSource> source, IObservable<TDelay> delay)
{
return Observable.Create<TSource>(observer =>
{
var subscription = new SerialDisposable();
subscription.Disposable = delay
.IgnoreElements()
.Subscribe(_ => {}, () => {
Console.WriteLine("Waiting to subscribe to source");
// Artifical sleep to create a problem
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Subscribing to source");
// Is this line safe?
subscription.Disposable = source.Subscribe(observer);
});
return subscription;
});
}
}
这运营商将订阅源,一旦通过延迟可观察到的已完成。你看它是多么合理 - 它使用了 SerialDisposable
来正确显示两种基础在时间上分开订阅它的观察者作为一个单一的一次性的。
This operator will subscribe to the source once the passed delay observable has completed. Look how reasonable it is - it uses a SerialDisposable
to correctly present the two underlying temporally separate subscriptions to it's observer as a single disposable.
然而,这是微不足道的颠覆该运营商并获得它会导致异常:
However, it's trivial to subvert this operator and get it to cause an exception:
void Main()
{
var scheduler = new EventLoopScheduler();
var rx = Observable.Range(0, 10, scheduler)
.ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1)));
var subs = rx.Subscribe();
Thread.Sleep(TimeSpan.FromSeconds(2));
subs.Dispose();
scheduler.Dispose();
}
这里发生了什么?我们正在创建一个范围
在EventLoopScheduler,而是依附于我们的 ReasonableDelay
与一个定时使用它的默认调度。
What's happening here? We are creating a Range
on the EventLoopScheduler, but attaching our ReasonableDelay
with delay stream created with a Timer
using it's default scheduler.
现在我们同意,等到我们的延迟流完毕,然后我们处理我们的订阅和EventLoopScheduler在正确的命令。
Now we subscribe, wait until our delay stream is completed, then we dispose our subscription and the EventLoopScheduler in the "right order".
人工延迟我插入 Thread.sleep代码
确保竞争条件,可以很容易地自然发生 - 延迟完成后,认购已被释放,但为时已晚,以防止范围
运营商访问处置EventLoopScheduler。
The artifical delay I inserted with Thread.Sleep
ensures a race condition that could easily occur naturally - the delay has completed, the subscription has been disposed but it's too late to prevent the Range
operator accessing the disposed EventLoopScheduler.
我们甚至可以拉紧我们合理的努力来检查,一旦延迟部分已完成观察者有退订:
We can even tighten up our reasonable efforts to check if the observer has unsubscribed once the delay portion has completed:
// In the ReasonableDelay method
.Subscribe(_ => {}, () => {
if(!subscription.IsDisposed) // Check for unsubscribe
{
Console.WriteLine("Waiting to subscribe to source");
// Artifical sleep to create a problem
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Subscribing to source");
// Is this line safe?
subscription.Disposable = source.Subscribe(observer);
}
});
这不会帮助。有没有办法使用该运营商之一的背景下纯粹的锁定语义。
It won't help. There's no way to use locking semantics purely in the context of this operator either.
您没有业务处理的EventLoopScheduler!一旦你它传递给其他运营商接收,你已经通过了它的责任。这是到接收运营商遵循的准则的清理他们的订阅及时的方式尽可能 - 这将意味着直接或间接地取消对EventLoopScheduler任何未决的计划项目,并停止任何进一步的安排,以便它的队列尽快清空。可能的
You have no business disposing that EventLoopScheduler! Once you have passed it to other Rx Operators, you have passed on the responsibility for it. It's up to the Rx Operators to follow the guidelines an clean up their subscriptions in as timely a manner as possible - which would mean directly or indirectly cancelling any pending scheduled items on the EventLoopScheduler and stopping any further scheduling so that it's queue empties as quickly as possible.
在上面的例子中,你可以归因于问题了几分做作使用多个调度程序和ReasonableDelay强制睡眠 - 但它不是很难想象一个真正的场景:操作员不能立即清理。
In the example above, you could attribute the issue to the somewhat contrived use of multiple schedulers and the forced Sleep in ReasonableDelay - but it's not hard to image a genuine scenario where an operator can't clean up immediately.
从本质上讲,通过设置接收调度你正在做接收相当于一个线程中止的。而就在那种情况下,你可能有例外情况处理!
Essentially, by disposing the Rx scheduler you are doing the Rx equivalent of a thread abort. And just as in that scenario, you may have exceptions to handle!
做的正确的事情是拉开神秘的 SomeMoreRXFunctions()
,并确保他们都秉承尽可能合理地可能的准则。
The right thing to do is pull apart the mysterious SomeMoreRXFunctions()
and ensure they are adhering to the guidelines as much as is reasonably possible.
这篇关于RX2.0:diposing EventLoopScheduler后的ObjectDisposedException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!