我想要实现的是缓冲来自某些 IObservable 的传入事件(它们以突发形式出现)并进一步释放它们,但在均匀间隔内一个一个地释放。
像这样:
-oo-ooo-oo------------------oooo-oo-o-------------->
-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
由于我对 Rx 很陌生,因此我不确定是否已经有一个 Subject 或一个操作符可以做到这一点。也许它可以通过组合来完成?
更新:
谢谢
Richard Szalay 用于指出 Drain 运算符,我发现了另一个 Drain 运算符用法的 example by James Miles。这是我设法让它在 WPF 应用程序中工作的方法:
.Drain(x => {
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
}).Subscribe();
我玩得很开心,因为省略 scheduler 参数会导致应用程序在 Debug模式下崩溃而没有任何异常出现(我需要学习如何处理 Rx 中的异常)。
Process 方法直接修改 UI 状态,但我想从中制作 IObservable 非常简单(使用 ISubject?)。
更新:
与此同时,我一直在试验 ISubject,下面的类做了我想要的——它及时释放了缓冲的 Ts:
public class StepSubject<T> : ISubject<T>
{
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;
public StepSubject(TimeSpan interval, IScheduler scheduler)
{
this.interval = interval;
this.scheduler = scheduler;
}
void Step()
{
T next;
lock (queue)
{
idle = queue.Count == 0;
if (!idle)
next = queue.Dequeue();
}
if (!idle)
{
cancel.Disposable = scheduler.Schedule(Step, interval);
subscriber.OnNext(next);
}
}
public void OnNext(T value)
{
lock (queue)
queue.Enqueue(value);
if (idle)
cancel.Disposable = scheduler.Schedule(Step);
}
public IDisposable Subscribe(IObserver<T> observer)
{
subscriber = observer;
return cancel;
}
}
为了清楚起见,这个简单的实现从 OnCompleted 和 OnError 中剥离,也只允许单个订阅。
最佳答案
它实际上比听起来更狡猾。
使用 Delay
不起作用,因为这些值仍然会大量发生,只是稍微延迟。
将 Interval
与 CombineLatest
或 Zip
一起使用不起作用,因为前者将导致跳过源值,后者将缓冲间隔值。
我认为新的 Drain
运算符( added in 1.0.2787.0 ),结合 Delay
应该可以解决问题:
source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));
Drain
运算符的工作方式与 SelectMany
类似,但要等到前一个输出完成后再使用下一个值调用选择器。它仍然不是你想要的(块中的第一个值也将被延迟),但它很接近:上面的用法现在与你的大理石图相匹配。编辑: 显然框架中的
Drain
不像 SelectMany
那样工作。我会在官方论坛上寻求一些建议。与此同时,这里有一个 Drain 的实现,它可以完成你所追求的:编辑 09/11: 修复了实现中的错误并更新了用法以匹配您请求的大理石图。
public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}
关于c# - 一种以均匀间隔推送缓冲事件的方法,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/4123178/