问题描述
我想要:
- 立即订阅
IObservable< T>
,但是立即开始缓冲接收到的任何T
(即,还没有被我的IObserver
看到) / li>
- 做一些工作。
- 当工作完成后,将缓冲区刷新到我的
IObserver< T>
并继续
- Immediately subscribe to an
IObservable<T>
, but immediately start buffering anyT
that is received (i.e. not yet seen by myIObserver<T>
). - Do some work.
- When the work is completed, flush the buffer to my
IObserver<T>
and continue
这是非常重要的订阅是第一件事。
It is quite important that the subscription is the first thing that happens.
在一个大理石图表单中,我是这样的...
In a 'marble diagram' form I am after something like this...
Time T+1 2 3 4 5 6 7 8
s1:IObservable<int> 1 2 3 4 5 6 7 8
s2:IObservable<bool> t
r: IObservable<int> 1 3 4 5 6 7 8
2
...在T + 1我订阅了一个 IObservable< bool>
r
,它本身依赖于 IObservable< int&
s1
和 IObservable< bool>
s2
。 s1
是我不控制的流, s2
是我控制的发布
开始工作。
... in that at T+1 I subscribe to an IObservable<bool>
r
that itself is dependent upon IObservable<int>
s1
and IObservable<bool>
s2
. s1
is a stream that I don't control, s2
is one that I do control (a Subject), and publish
on when the work is done.
我认为 SkipUntil
会帮助我,但这不会缓冲在 IObservable
已完成。
I thought that SkipUntil
would help me out, but that doesn't buffer the events that are received before the dependent IObservable
has completed.
这里有一些我认为可以工作的代码,但不是因为 SkipUntil
不是缓冲区。
Here's some code that I thought would work, but doesn't due to SkipUntil
not being a buffer.
var are = new AutoResetEvent(false);
var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));
events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());
var subject = new Subject<int>();
var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Subscribing to events...");
events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
Console.WriteLine("Subscribed.");
completed.Subscribe(x => Console.WriteLine("Completed"));
subject.OnNext(10);
are.WaitOne();
Console.WriteLine("Done");
我知道各种 Buffer
方法,他们似乎不适合在这种情况下,因为我不是真正地缓冲,只是协调活动在我的订阅开始。
I know about the various Buffer
methods but they don't seem appropriate in this case as I am not really buffering here, just coordinating activity at the start of my subscriptions.
UPDATE
我将Enigmativity的回应推广到以下可能有用的扩展方法:
I have generalised Enigmativity's response into the following extension method that might be useful:
public static class ObservableEx
{
public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
{
var observable = Observable.Create<TSource>(o =>
{
var replaySubject = new ReplaySubject<TSource>();
var sub1 = source.Subscribe(replaySubject);
var query =
completed.Take(1).Select(
x => replaySubject.AsObservable());
var sub2 = query.Switch().Subscribe(o);
return new CompositeDisposable(sub1, sub2);
});
return observable;
}
}
推荐答案
为我工作:
var r = Observable.Create<int>(o =>
{
var rs = new ReplaySubject<int>();
var subscription1 = s1.Subscribe(rs);
var query = from f in s2.Take(1) select rs.AsObservable();
var subscription2 = query.Switch().Subscribe(o);
return new CompositeDisposable(subscription1, subscription2);
});
这篇关于如何订阅,但缓冲数据,IObservable直到另一个IObservable发布?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!