我有一个工作线程正在阻止调用(ReadFrame)
从套接字(IO绑定(bind))读取传入的数据。
线程运行一个循环,将数据输入Subject
消费者可以观察到的。

private void ReadLoop()
{
    while (!IsDisposed)
    {
        var frame = _Socket.ReadFrame();
        _ReceivedFrames.OnNext(frame);
    }
}

我想知道是否还有更多的RX方式可以做到这一点。

这是我进行的尝试(玩具示例):
var src = Observable
         .Repeat(Unit.Default)
         .Select(_ =>
         {
             Thread.Sleep(1000);              // simulated blocking ReadFrame call
             return "data read from socket";
         })
         .SubscribeOn(ThreadPoolScheduler.Instance) // avoid blocking when subscribing
         .ObserveOn(NewThreadScheduler.Default)     // spin up new thread (?)
         .Publish()
         .RefCount();

var d = src.Subscribe(s => s.Dump()); // simulated consumer

Console.ReadLine();  // simulated main running code

d.Dispose();  // tear down

我正在努力正确使用ObserveOnSubscribeOn和调度程序。
玩具示例似乎有效,但是我不确定线程​​的生存期是否得到正确管理。

阅读器线程是否通过d.Dispose()调用关闭了?
我是否需要创建一个新线程?
我应该改用Observable.Create吗?如何?


ReadLoop()方法是符合以下接口(interface)的类的一部分:
public interface ICanSocket : IDisposable
{
    IObservable<CanFrame> ReceivedFrames { get; }
    IObserver<CanFrame>   FramesToSend   { get; }
}

处置父_Socket时,将处置其成员ICanSocket

最佳答案

实现此目的的最“Rxy”方法是使用Rxx,它具有可观察到的用于执行异步I/O的方法。

似乎您最关心的是:

  • 订阅时,请勿阻止订户线程(也可以在后台线程上运行I/O线程)
  • 当调用者退订时,停止I/O线程

  • 解决这些问题的一种方法是只使用async Create方法:
    // just use Task.Run to "background" the work
    var src = Observable
        .Create<CanFrame>((observer, cancellationToken) => Task.Run(() =>
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var frame = _Socket.ReadFrame();
                if (frame == null) // end of stream?
                {
                    // will send a Completed event
                    return;
                }
    
                observer.OnNext(frame);
            }
        }));
    
    var d = src.Subscribe(s => s.Dump());
    Console.ReadLine();
    d.Dispose();
    

    09-06 21:45