我正在学习.net rx(reactive extensions)库,并试图创建一个从控制台读取用户输入的合适的可观察对象。
到目前为止,我发现:

    public static IObservable<string> ConsoleInputObservable()
    {
        return Observable.Create<string>(observer =>
        {
            var cancelable = new BooleanDisposable();
            while(!cancelable.IsDisposed)
            {
                observer.OnNext(Console.ReadLine());
            }
            observer.OnCompleted();
            return cancelable;
        });
    }

不幸的是,这个实现至少有一个问题-没有办法取消订阅它。
所以我的问题是:如何正确地将一系列阻塞事件转换为可观测事件?
谢谢。
编辑:输入错误

最佳答案

干得好。
注意以下几点:
这允许用户提供一个适当的调度程序来控制并发运行的位置,并产生每个循环以防止它变得过于混乱(尽管在控制台上等待显然是非常糟糕的…)
在本例中,我们不应该调用OnCompleted,因为终止的唯一方法是取消订阅,并且您在取消订阅后将尽量不发送任何进一步的消息
我们也不会在这里发送取消后通知。
代码如下:

public static IObservable<string> ConsoleInputObservable(
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return Observable.Create<string>(o =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            while(!ct.IsCancellationRequested)
            {
                var next = Console.ReadLine();
                if(ct.IsCancellationRequested)
                    return;

                o.OnNext(next);
                await ctrl.Yield();
            }
        });
    });
}

附录
@MartinLiverSage评论说,有多个订阅者的行为是不可取的——这促使了本附录。您可以简单地OnNext上面的代码,但是考虑到控制台的本质是一个应用程序只有一个线程,并且一次只能有一个线程在读取它,因此需要使用不同的方法。
我忽略了上面这一点,因为我觉得问题可能更多的是关于线程方面,而不是控制台的性质。如果您真的对在控制台中输入的报告行感兴趣,那么下面这样的主循环可能更实用——这代表了对Publish()的合理使用。
static void Main()
{
    Subject<string> sc = new Subject<string>();

    // kick off subscriptions here...
    // Perhaps with `ObserveOn` if background processing is required
    sc.Subscribe(x => Console.WriteLine("Subscriber1: " + x));
    sc.Subscribe(x => Console.WriteLine("Subscriber2: " + x));

    string input;
    while((input = Console.ReadLine()) != "q")
    {
        sc.OnNext(input);
    }
    sc.OnCompleted();

    Console.WriteLine("Finished");
}

关于c# - 如何将阻塞事件转换为可观察的?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29447049/

10-11 00:59
查看更多