本文介绍了如何订阅,但缓冲数据,IObservable直到另一个IObservable发布?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想要:


  1. 立即订阅 IObservable< T> ,但是立即开始缓冲接收到的任何 T (即,还没有被我的 IObserver 看到) / li>
  2. 做一些工作。

  3. 当工作完成后,将缓冲区刷新到我的 IObserver< T> 并继续

  1. Immediately subscribe to an IObservable<T>, but immediately start buffering any T that is received (i.e. not yet seen by my IObserver<T>).
  2. Do some work.
  3. When the work is completed, flush the buffer to my IObserver<T> and continue

这是非常重要的订阅是第一件事。

It is quite important that the subscription is the first thing that happens.

在一个大理石图表单中,我是这样的...

In a 'marble diagram' form I am after something like this...

Time                  T+1   2   3   4   5   6   7   8
s1:IObservable<int>     1   2   3   4   5   6   7   8  
s2:IObservable<bool>          t    
r: IObservable<int>           1 3   4   5   6   7   8
                              2

...在T + 1我订阅了一个 IObservable< bool> r ,它本身依赖于 IObservable< int& s1 IObservable< bool> s2 s1 是我不控制的流, s2 是我控制的发布开始工作。

... in that at T+1 I subscribe to an IObservable<bool> r that itself is dependent upon IObservable<int> s1 and IObservable<bool> s2. s1 is a stream that I don't control, s2 is one that I do control (a Subject), and publish on when the work is done.

我认为 SkipUntil 会帮助我,但这不会缓冲在 IObservable 已完成。

I thought that SkipUntil would help me out, but that doesn't buffer the events that are received before the dependent IObservable has completed.

这里有一些我认为可以工作的代码,但不是因为 SkipUntil 不是缓冲区。

Here's some code that I thought would work, but doesn't due to SkipUntil not being a buffer.

        var are = new AutoResetEvent(false);
        var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));

        events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());

        var subject = new Subject<int>();
        var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));

        Console.WriteLine("Subscribing to events...");

        events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
        Console.WriteLine("Subscribed.");

        completed.Subscribe(x => Console.WriteLine("Completed"));

        subject.OnNext(10);

        are.WaitOne();
        Console.WriteLine("Done");

我知道各种 Buffer 方法,他们似乎不适合在这种情况下,因为我不是真正地缓冲,只是协调活动在我的订阅开始。

I know about the various Buffer methods but they don't seem appropriate in this case as I am not really buffering here, just coordinating activity at the start of my subscriptions.

UPDATE

我将Enigmativity的回应推广到以下可能有用的扩展方法:

I have generalised Enigmativity's response into the following extension method that might be useful:

public static class ObservableEx
{
    public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
    {
        var observable = Observable.Create<TSource>(o =>
        {
            var replaySubject = new ReplaySubject<TSource>();
            var sub1 = source.Subscribe(replaySubject);
            var query =
                completed.Take(1).Select(
                    x => replaySubject.AsObservable());
            var sub2 = query.Switch().Subscribe(o);
            return new CompositeDisposable(sub1, sub2);
        });
        return observable;
    }        
}


推荐答案

为我工作:

var r = Observable.Create<int>(o =>
{
    var rs = new ReplaySubject<int>();
    var subscription1 = s1.Subscribe(rs);
    var query = from f in s2.Take(1) select rs.AsObservable();
    var subscription2 = query.Switch().Subscribe(o);
    return new CompositeDisposable(subscription1, subscription2);
});

这篇关于如何订阅,但缓冲数据,IObservable直到另一个IObservable发布?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-26 19:44