我有两个有序的 IObservable<double> 并且想将它们合并为一个有序的 IObservable<double> 。下面给出一个例子:

A    2  3  4  -  -       5  -
B    -  -  -  1  5       -  6
Out  -  -  -  1  2 3 4   5  -

这个想法是 Out 只有在确定最终顺序时才会产生值。我确信这应该很容易做到,但我无法提出一个很好的解决方案(在这种情况下,好的意味着尽可能多地由 rx 运算符组成);

编辑:我希望以下程序产生以下输出
static void Main(string[] args)
{
    var a = new Subject<int>();
    var b = new Subject<int>();

    a.MergeSort(b).Subscribe(Console.WriteLine);

    a.OnNext(2);
    Console.WriteLine("tick");
    a.OnNext(4);
    Console.WriteLine("tick");
    a.OnNext(6);
    Console.WriteLine("tick");
    b.OnNext(0);
    Console.WriteLine("tick");
    b.OnNext(1);
    Console.WriteLine("tick");
    b.OnNext(5);
    Console.WriteLine("tick");
    b.OnNext(7);
    Console.WriteLine("tick");
}

Output:
tick
tick
tick
0
tick
1
tick
2
4
5
tick
6
tick

最佳答案

这是作为 RX 扩展运营商

public static class MergeMixins
{
    public static IObservable<int> MergeSort(this IObservable<int> This, IObservable<int> other)
    {
        return Observable.Create<int>((observer) =>
            {
                Queue<int> BufferA = new Queue<int>();
                Queue<int> BufferB = new Queue<int>();

                Action<Queue<int>, int> update = (Queue<int> pushBuffer, int value)=>{

                    pushBuffer.Enqueue(value);


                    while (BufferA.Count() != 0 && BufferB.Count() != 0)
                    {
                        if (BufferA.Peek() < BufferB.Peek())
                            observer.OnNext(BufferA.Dequeue());
                        else
                            observer.OnNext(BufferB.Dequeue());
                    }
                };

                return new CompositeDisposable(
                    This.Subscribe(v => update(BufferA, v)),
                    other.Subscribe(v => update(BufferB, v)));

            });

    }

}

广告我的测试输出正在使用您的测试
Result StandardOutput:
tick
tick
tick
0
tick
1
tick
2
4
5
tick
6
tick

关于c# - 合并两个有序的 IObservable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13484587/

10-10 21:48