我正在使用Rx中的BufferWithTime()来批量处理消息。如果我的OnNext方法花费的时间比使用的时间间隔长,我是否可以同时执行两次OnNext方法的调用?

换句话说,将以绝对值评估BufferWithTime()调用中指定的时间间隔,还是在OnNext方法返回后将其作为“异步睡眠”?

最佳答案

不,OnNext的呼叫将排队。下面的代码在缓冲时间过去时以及对订户的OnNext调用的开始/结束(在该时间中,缓冲睡眠时间是缓冲时间的4倍)写入跟踪:

Observable.Interval(TimeSpan.FromMilliseconds(50), Scheduler.TaskPool)
    .Take(50)
    .BufferWithTime(TimeSpan.FromMilliseconds(100))
    .Do(_ => Trace.WriteLine("Buffer elapsed"))
    .ObserveOn(Scheduler.TaskPool)
    .Subscribe(_ =>
    {
        Trace.WriteLine("Begin OnNext");
        Thread.Sleep(200);
        Trace.WriteLine("End OnNext");
    });


输出如下。您可以看到,即使在Buffer elapsed调用期间两次OnNext发生,Begin / End OnNext也不会重叠:


缓冲已用
开始OnNext
缓冲已用
缓冲已用
在下一个结束
开始OnNext
缓冲已用
缓冲已用
缓冲已用
在下一个结束
开始OnNext
在下一个结束
开始OnNext
在下一个结束
开始OnNext
在下一个结束
开始OnNext
在下一个结束

10-07 19:19
查看更多