在尝试处理最坏的情况时,我偶然发现了一个问题。在完成之前处理具有预订的IConnectableObservable
。
我写了一个人为的例子,重现了这个问题。
var hotSource = Observable
.Return(1)
.Delay(TimeSpan.FromMilliseconds(500))
.Publish();
var disposable = hotSource.Connect();
这是我的热门观察。我添加了一个延迟,以便可以在它触发值之前对其进行订阅。
在下面,我返回一个等待的可观察值,它应该允许我等到hotSource完成。我相信这一点我应该订阅
var awaitable = hotSource
.Do((Console.WriteLine))
.Finally(() => Console.WriteLine("Complete"))
.LastOrDefaultAsync();
现在,如果我立即处理,则等待我的订阅:
disposable.Dispose();
await awaitable;
它只是完全挂起了应用程序,并且从未调用
Finally
。我期望我的
awaitable
返回OnCompleted
或返回最差的OnError
和ObjectDisposedException
。此外,如果您在订阅后连接,它仍然会挂起。有什么想法吗?[更新]
接受supertoi的回答(直到处置之后我才进行订阅),我在处置之前通过显式订阅重新制定了问题。
var mutex = new SemaphoreSlim(0);
var hotSource = Observable
.Return(1)
.Delay(TimeSpan.FromMilliseconds(500))
.Publish();
var subscription = Observable.Create(
(IObserver<int> observer) =>
{
Console.WriteLine("Subscribed");
return hotSource.Subscribe(observer);
})
.Finally(() => mutex.Release())
.Subscribe(Console.WriteLine);
hotSource
.Connect()
.Dispose();
await mutex.WaitAsync();
Console.WriteLine("Complete");
这再次停止,但是在
IConnectableObservable
被处置之前,它挑衅地订阅了。 最佳答案
LastOrDefaultAsync()
返回一个IObservable
,因此它不会使您订阅该序列。await
订阅您的序列。
在IConnectableObservable.Dispose()
上检查the comment
Disposable用于断开可观察包装与其来源的连接,从而导致订阅的观察者停止接收来自基础可观察序列的值。
这意味着将停止发出包括OnNext
,OnError
和OnCompleted
在内的所有通知。
因此,如果在处置await
之后IConnectableObservable
您将不会收到任何通知。您可以再次Connect
到hotsource
接收通知。