问题描述
我有一个方法 void OnAction(Action callback)
,我想使用反应式扩展 (Rx) 从中创建一个 IObservable
.
I have a method void OnAction(Action<Person> callback)
and I wanna create an IObservable<T>
from this using the reactive extensions (Rx).
我找到了两种可以帮助我的方法:Observable.FromEvent()
和 Observable.Start()
:
I have found two methods that could help me: Observable.FromEvent()
and Observable.Start()
:
var observable = Observable.Start(() =>
{
Person person = null;
_mngr.OnAction(p => person = p);
return person;
});
和:
var observable = Observable.FromEvent<Person>(
action => _mngr.OnAction(action), //Add Handler
action => // Remove Handler
{
});
第一个有一个闭包,我必须评估if person != null
:
The first one have an closure and I must evaluate if person != null
:
var foo= observable.Where(p =>
{
if(p!=null) //...
});
第二个采用 Action 参数,将给定的事件处理程序与底层 .NET 事件分离...但 OnAction 方法不是 .NET 事件.
The second one takes an Action argument that detaches the given event handler from the underlying .NET event... But OnAction method isn't a .NET event.
这两种方法都很好,但(在我看来)闻起来...
Both ways works well, but (in my opinion) smells...
那么,从 OnAction 方法创建 IObservable 的最佳方法是什么?
So, what is the best way to create an IObservable from OnAction Method?
推荐答案
详细说明 Chris 的回答并提出您的意见.从这里开始:
To elaborate on Chris's answer and address your comments. Starting from here:
var personAsObservable = Observable.Create<Person>(observer => {
_mngr.OnAction(person => {
observer.OnNext(person);
observer.OnCompleted();
});
return Disposable.Empty;
});
就目前而言,这将导致为每个订阅者调用 OnAction
.
As it stands, this will cause OnAction
to be called for every subscriber.
避免这种情况的一般方法是发布 observable.发布流会导致订阅者共享事件.
The general approach for avoiding this is to publish the observable. Publishing a stream causes subscribers to share events.
Publish
运算符返回一个可连接的可观察对象.这可以接受订阅者,但在您调用 Connect()
之前不会实际订阅底层流 - 一种返回 IDisposable
的方法,您可以使用它来控制单个连接到基础 observable - 将其设置为取消订阅.
The Publish
operator returns a connectable observable. This can accept subscribers, but won't actually subscribe to the underlying stream until you call Connect()
- a method that returns an IDisposable
you can use to control the single connection to the underlying observable - dispose it to unsubscribe.
有几个与发布相关的运算符可帮助您管理对基础流的订阅.
There are several operators related to publishing that help you govern subscriptions to the underlying stream.
RefCount
与可连接的 observable 一起工作,以管理连接并与订阅共享事件,只要底层订阅正在运行.完成后,后续订阅将重新启动.这可能足以满足您的目的.要使用它,请订阅以下内容(这是一个非常常见的 Rx 习惯用法):
RefCount
works with a connectable observable to manage the connection and share events with subscriptions as long as the underlying subscription is running. Once it completes, subsequent subscriptions will restart. This may be sufficient for your purposes. To use this, subscribe to the following (which is a very common Rx idiom):
var personPub = personAsObservable.Publish().RefCount();
其他方法包括将 Replay(n)
附加到源 observable,其中 n 个事件将被缓存并重播给在底层流完成后到达的子序列订阅者.因此,如果您只想获得一次结果,这很有用.请注意,您必须在 Replay
上显式调用 Connect
.您也可以直接调用 Publish
并自行管理连接.
Other approaches include appending Replay(n)
to the source observable where n events will be cached and replayed to subsequence subscribers that arrive after the underlying stream has completed. So this is useful if you want to only get results once. Note that you must call Connect
on Replay
explicitly. You can also just call Publish
and manage connecting yourself.
请注意,附加这些操作符不会改变底层 observable 的行为——所有的发布、缓存等都是在附加的操作符上完成的.所以在上面的例子中,订阅者应该使用personPub
.
Note that appending these operators does not change the behaviour of the underlying observable - all the publishing, caching etc. is done on the appended operator. So in the above example, it's personPub
that should be used by subscribers.
显式控制连接如下所示:
Controlling the connection explicitly looks like this:
IConnectableObservable<Person> personPub = personAsObservable.Publish();
var subscriberOne = personPub.Subscribe(...); // personAsObservable not started
var connection = personPub.Connect(); // *now* personAsObservable is subscribed
var subscriberTwo = personPub.Subscribe(...); // shares underlying subscription
// but could miss events
connection.Dispose(); // underlying connection terminated
// but may have already OnCompleted anyway
// in which case this is a no-op
这篇关于获得 IObservable<T> 的最佳方式来自 Action <T>的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!