我正在尝试使用.NET的Reactive Extensions重写一些代码,但是我需要一些有关如何实现目标的指南。

我有一个在低级库中封装一些异步行为的类。考虑一些可以读写网络的东西。当类启动时,它将尝试连接到环境,而在成功时,它将通过从工作线程进行调用来发回此信号。

我想将这种异步行为转换为同步调用,并且在下面创建了一个大大简化的示例,说明了如何实现此目的:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

在工作线程上运行AsyncStart只是模拟库的异步行为的一种方法,它不是我的真实代码的一部分,在我的真实代码中,低级库提供了线程并在回调中调用我的代码。

请注意,如果启动未在超时间隔内完成,Start方法将抛出TimeoutException

我想重写此代码以使用Rx。这是我的第一次尝试:
Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

这是一个不错的尝试,但不幸的是,它包含了比赛条件。如果启动快速完成(例如,如果delay为0),并且在A点处存在附加延迟,则在执行OnNext之前,将在readySubject上调用First。本质上,我正在使用IObservableTimeoutFirst从未看到启动已完成,而是会抛出TimeoutException

似乎已创建Observable.Defer来处理此类问题。这是使用Rx的更为复杂的尝试:
Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

现在,异步操作不会立即开始,而仅在使用IObservable时才开始。不幸的是,仍然存在争用条件,但是这次是在B点。如果开始的异步操作在OnNext lambda返回之前调用Defer,它仍然会丢失,并且TimeoutException会抛出Timeout

我知道我可以使用像Replay这样的运算符来缓冲事件,但是我最初没有Rx的示例并不使用任何类型的缓冲。我有没有办法在没有比赛条件的情况下使用Rx解决我的问题?本质上,仅在将IObservable连接到TimeoutFirst之后才启动异步操作。

根据保罗·贝茨(Paul Betts)的回答,这里是可行的解决方案:
void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

有趣的部分是C点处的延迟时间长于AsyncStart完成所需的时间。 AsyncSubject保留最后发送的通知,并且TimeoutFirst仍将按预期执行。

最佳答案

因此,关于Rx的一件事,我想很多人一开始都会做(包括我自己!):如果您使用的是诸如ResetEvents,Thread.Sleeps之类的任何传统线程功能,或者您做错了(tm )-就像将内容强制转换为LINQ中的Array一样,因为您知道基础类型恰好是数组。

要知道的关键是,异步函数由返回IObservable<TResult>的函数表示-这是神奇的调味料,可让您在完成某些操作时发出信号。因此,这就是您如何像传统的Silverlight Web服务中那样“Rx-ify”一个更传统的异步函数:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile, buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}



这是AsyncSubject出现的地方-这确保即使asyncReaderFunc击败了订阅者,AsyncSubject仍将“重放”发生的事情。

因此,既然我们有了函数,就可以对它做很多有趣的事情:
// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .First();

09-11 18:50