IConnectableObservable

IConnectableObservable

在尝试处理最坏的情况时,我偶然发现了一个问题。在完成之前处理具有预订的IConnectableObservable

我写了一个人为的例子,重现了这个问题。

var hotSource = Observable
    .Return(1)
    .Delay(TimeSpan.FromMilliseconds(500))
    .Publish();

var disposable = hotSource.Connect();


这是我的热门观察。我添加了一个延迟,以便可以在它触发值之前对其进行订阅。

在下面,我返回一个等待的可观察值,它应该允许我等到hotSource完成。我相信这一点我应该订阅

var awaitable = hotSource
    .Do((Console.WriteLine))
    .Finally(() => Console.WriteLine("Complete"))
    .LastOrDefaultAsync();


现在,如果我立即处理,则等待我的订阅:

disposable.Dispose();
await awaitable;


它只是完全挂起了应用程序,并且从未调用Finally

我期望我的awaitable返回OnCompleted或返回最差的OnErrorObjectDisposedException。此外,如果您在订阅后连接,它仍然会挂起。有什么想法吗?

[更新]

接受supertoi的回答(直到处置之后我才进行订阅),我在处置之前通过显式订阅重新制定了问题。

var mutex = new SemaphoreSlim(0);

var hotSource = Observable
    .Return(1)
    .Delay(TimeSpan.FromMilliseconds(500))
    .Publish();

var subscription = Observable.Create(
    (IObserver<int> observer) =>
    {
        Console.WriteLine("Subscribed");
        return hotSource.Subscribe(observer);
    })
    .Finally(() => mutex.Release())
    .Subscribe(Console.WriteLine);

hotSource
    .Connect()
    .Dispose();

await mutex.WaitAsync();
Console.WriteLine("Complete");


这再次停止,但是在IConnectableObservable被处置之前,它挑衅地订阅了。

最佳答案

LastOrDefaultAsync()返回一个IObservable,因此它不会使您订阅该序列。
await订阅您的序列。

IConnectableObservable.Dispose()上检查the comment


  Disposable用于断开可观察包装与其来源的连接,从而导致订阅的观察者停止接收来自基础可观察序列的值。


这意味着将停止发出包括OnNextOnErrorOnCompleted在内的所有通知。

因此,如果在处置await之后IConnectableObservable您将不会收到任何通知。您可以再次Connecthotsource接收通知。

10-04 10:00