我正在使用Rx中的BufferWithTime()来批量处理消息。如果我的OnNext方法花费的时间比使用的时间间隔长,我是否可以同时执行两次OnNext方法的调用?
换句话说,将以绝对值评估BufferWithTime()调用中指定的时间间隔,还是在OnNext方法返回后将其作为“异步睡眠”?
最佳答案
不,OnNext
的呼叫将排队。下面的代码在缓冲时间过去时以及对订户的OnNext调用的开始/结束(在该时间中,缓冲睡眠时间是缓冲时间的4倍)写入跟踪:
Observable.Interval(TimeSpan.FromMilliseconds(50), Scheduler.TaskPool)
.Take(50)
.BufferWithTime(TimeSpan.FromMilliseconds(100))
.Do(_ => Trace.WriteLine("Buffer elapsed"))
.ObserveOn(Scheduler.TaskPool)
.Subscribe(_ =>
{
Trace.WriteLine("Begin OnNext");
Thread.Sleep(200);
Trace.WriteLine("End OnNext");
});
输出如下。您可以看到,即使在
Buffer elapsed
调用期间两次OnNext
发生,Begin / End OnNext也不会重叠:缓冲已用
开始OnNext
缓冲已用
缓冲已用
在下一个结束
开始OnNext
缓冲已用
缓冲已用
缓冲已用
在下一个结束
开始OnNext
在下一个结束
开始OnNext
在下一个结束
开始OnNext
在下一个结束
开始OnNext
在下一个结束