


I have an API which exposes an IObservable Status. But this status depends on an underlying observable source which has to be initialised via Init.


What I'd like to do is protect the users from having to do things in the right order: as it currently stands, if they try to subscribe to the Status before performing an Init, they get an exception because they source is not initialised.


So I had the genius idea of using a Subject to decouple the two: the external user subscribing to my Status is just subscribing to the Subject, then when they call Init, I subscribe to the underlying service using my Subject.

private ISubject<bool> _StatusSubject = new Subject<bool>();
public IObservable<bool> Status { get { return _StatusSubject; } }

public void Init()
    _Connection = new Connection();


However, from tests on a dummy project, the problem is that the initialisation'wakes up' my underlying Observable by subscribing the Subject to it, even if nobody has yet subscribed to the subject. That's something I'd like to avoid if possible, but I'm not sure how...


(I'm also mindful of the received wisdom that "the general rule is that if you're using a subject then you're doing something wrong")



It seems like the concept you are missing is how to know when someone starts listening and only init your underlying source. Usually you use Observable.Create or one of its siblings (Defer, Using, ...) to do this.


Here's how to do it without a Subject:

private IObservable<bool> _status = Observable.Defer(() =>
    _Connection = new Connection();
    return Underlying.GetDeferredObservable(_Connection);

public IObservable<bool> Status { get { return _status; } }


Defer will not call the init code until someone actually subscribes.


But this has a couple of potential issues:

  1. 每个观察者将建立一个新的连接
  2. 观察者退订时,连接不会被清除.


The 2nd issue is easy to solve, so let's do that first. Let's assume your Connection is disposable, in which case you can just do:

private IObservable<bool> _status = Observable
    .Using(() => new Connection(),
           connection => Underlying.GetDeferredObservable(connection));

public IObservable<bool> Status { get { return _status; } }


With this iteration, whenever someone subscribes, a new Connection is created and passed to the 2nd lamba method to construct the observable. Whenever the observer unsubscribes, the Connection is Disposed. If Connection is not a IDisposable, then you can use Disposable.Create(Action) to create an IDisposable which will run whatever action you need to run to cleanup the connection.


You still have the problem that each observer creates a new connection. We can use Publish and RefCount to solve that problem:

private IObservable<bool> _status = Observable
    .Using(() => new Connection(),
           connection => Underlying.GetDeferredObservable(connection))

public IObservable<bool> Status { get { return _status; } }

现在,当 first 观察者订阅时,将创建连接并订阅基础的observable.随后的观察者将共享连接并获取当前状态.当 last 观察者退订时,连接将被释放,所有连接均将关闭.如果此后有其他观察者订阅,则它们将再次重新备份.

Now, when the first observer subscribes, the connection will get created and the underlying observable will be subscribed. Subsequent observers will share the connection and will pick up the current status. When the last observer unsubscribes, the connection will be disposed and everything shut down. If another observer subscribes after that, it all starts back up again.

在引擎盖下,Publish实际上是在使用Subject共享单个可观察的信号源. RefCount正在跟踪当前正在观察的观察者数量.

Underneath the hood, Publish is actually using a Subject to share the single observable source. And RefCount is tracking how many observers are currently observing.


07-30 21:58