我有一个工作线程正在阻止调用(ReadFrame
)
从套接字(IO绑定(bind))读取传入的数据。
线程运行一个循环,将数据输入Subject
,
消费者可以观察到的。
private void ReadLoop()
{
while (!IsDisposed)
{
var frame = _Socket.ReadFrame();
_ReceivedFrames.OnNext(frame);
}
}
我想知道是否还有更多的RX方式可以做到这一点。
这是我进行的尝试(玩具示例):
var src = Observable
.Repeat(Unit.Default)
.Select(_ =>
{
Thread.Sleep(1000); // simulated blocking ReadFrame call
return "data read from socket";
})
.SubscribeOn(ThreadPoolScheduler.Instance) // avoid blocking when subscribing
.ObserveOn(NewThreadScheduler.Default) // spin up new thread (?)
.Publish()
.RefCount();
var d = src.Subscribe(s => s.Dump()); // simulated consumer
Console.ReadLine(); // simulated main running code
d.Dispose(); // tear down
我正在努力正确使用
ObserveOn
,SubscribeOn
和调度程序。玩具示例似乎有效,但是我不确定线程的生存期是否得到正确管理。
阅读器线程是否通过
d.Dispose()
调用关闭了?我是否需要创建一个新线程?
我应该改用
Observable.Create
吗?如何?ReadLoop()
方法是符合以下接口(interface)的类的一部分:public interface ICanSocket : IDisposable
{
IObservable<CanFrame> ReceivedFrames { get; }
IObserver<CanFrame> FramesToSend { get; }
}
处置父
_Socket
时,将处置其成员ICanSocket
。 最佳答案
实现此目的的最“Rxy”方法是使用Rxx,它具有可观察到的用于执行异步I/O的方法。
似乎您最关心的是:
解决这些问题的一种方法是只使用async Create方法:
// just use Task.Run to "background" the work
var src = Observable
.Create<CanFrame>((observer, cancellationToken) => Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
var frame = _Socket.ReadFrame();
if (frame == null) // end of stream?
{
// will send a Completed event
return;
}
observer.OnNext(frame);
}
}));
var d = src.Subscribe(s => s.Dump());
Console.ReadLine();
d.Dispose();