我正在学习.net rx(reactive extensions)库,并试图创建一个从控制台读取用户输入的合适的可观察对象。
到目前为止,我发现:
public static IObservable<string> ConsoleInputObservable()
{
return Observable.Create<string>(observer =>
{
var cancelable = new BooleanDisposable();
while(!cancelable.IsDisposed)
{
observer.OnNext(Console.ReadLine());
}
observer.OnCompleted();
return cancelable;
});
}
不幸的是,这个实现至少有一个问题-没有办法取消订阅它。
所以我的问题是:如何正确地将一系列阻塞事件转换为可观测事件?
谢谢。
编辑:输入错误
最佳答案
干得好。
注意以下几点:
这允许用户提供一个适当的调度程序来控制并发运行的位置,并产生每个循环以防止它变得过于混乱(尽管在控制台上等待显然是非常糟糕的…)
在本例中,我们不应该调用OnCompleted
,因为终止的唯一方法是取消订阅,并且您在取消订阅后将尽量不发送任何进一步的消息
我们也不会在这里发送取消后通知。
代码如下:
public static IObservable<string> ConsoleInputObservable(
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<string>(o =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
while(!ct.IsCancellationRequested)
{
var next = Console.ReadLine();
if(ct.IsCancellationRequested)
return;
o.OnNext(next);
await ctrl.Yield();
}
});
});
}
附录
@MartinLiverSage评论说,有多个订阅者的行为是不可取的——这促使了本附录。您可以简单地
OnNext
上面的代码,但是考虑到控制台的本质是一个应用程序只有一个线程,并且一次只能有一个线程在读取它,因此需要使用不同的方法。我忽略了上面这一点,因为我觉得问题可能更多的是关于线程方面,而不是控制台的性质。如果您真的对在控制台中输入的报告行感兴趣,那么下面这样的主循环可能更实用——这代表了对
Publish()
的合理使用。static void Main()
{
Subject<string> sc = new Subject<string>();
// kick off subscriptions here...
// Perhaps with `ObserveOn` if background processing is required
sc.Subscribe(x => Console.WriteLine("Subscriber1: " + x));
sc.Subscribe(x => Console.WriteLine("Subscriber2: " + x));
string input;
while((input = Console.ReadLine()) != "q")
{
sc.OnNext(input);
}
sc.OnCompleted();
Console.WriteLine("Finished");
}
关于c# - 如何将阻塞事件转换为可观察的?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29447049/