我有两个有序的 IObservable<double>
并且想将它们合并为一个有序的 IObservable<double>
。下面给出一个例子:
A 2 3 4 - - 5 -
B - - - 1 5 - 6
Out - - - 1 2 3 4 5 -
这个想法是
Out
只有在确定最终顺序时才会产生值。我确信这应该很容易做到,但我无法提出一个很好的解决方案(在这种情况下,好的意味着尽可能多地由 rx 运算符组成);编辑:我希望以下程序产生以下输出
static void Main(string[] args)
{
var a = new Subject<int>();
var b = new Subject<int>();
a.MergeSort(b).Subscribe(Console.WriteLine);
a.OnNext(2);
Console.WriteLine("tick");
a.OnNext(4);
Console.WriteLine("tick");
a.OnNext(6);
Console.WriteLine("tick");
b.OnNext(0);
Console.WriteLine("tick");
b.OnNext(1);
Console.WriteLine("tick");
b.OnNext(5);
Console.WriteLine("tick");
b.OnNext(7);
Console.WriteLine("tick");
}
Output:
tick
tick
tick
0
tick
1
tick
2
4
5
tick
6
tick
最佳答案
这是作为 RX 扩展运营商
public static class MergeMixins
{
public static IObservable<int> MergeSort(this IObservable<int> This, IObservable<int> other)
{
return Observable.Create<int>((observer) =>
{
Queue<int> BufferA = new Queue<int>();
Queue<int> BufferB = new Queue<int>();
Action<Queue<int>, int> update = (Queue<int> pushBuffer, int value)=>{
pushBuffer.Enqueue(value);
while (BufferA.Count() != 0 && BufferB.Count() != 0)
{
if (BufferA.Peek() < BufferB.Peek())
observer.OnNext(BufferA.Dequeue());
else
observer.OnNext(BufferB.Dequeue());
}
};
return new CompositeDisposable(
This.Subscribe(v => update(BufferA, v)),
other.Subscribe(v => update(BufferB, v)));
});
}
}
广告我的测试输出正在使用您的测试
Result StandardOutput:
tick
tick
tick
0
tick
1
tick
2
4
5
tick
6
tick
关于c# - 合并两个有序的 IObservable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13484587/