我正在尝试使用.NET的Reactive Extensions重写一些代码,但是我需要一些有关如何实现目标的指南。
我有一个在低级库中封装一些异步行为的类。考虑一些可以读写网络的东西。当类启动时,它将尝试连接到环境,而在成功时,它将通过从工作线程进行调用来发回此信号。
我想将这种异步行为转换为同步调用,并且在下面创建了一个大大简化的示例,说明了如何实现此目的:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
在工作线程上运行
AsyncStart
只是模拟库的异步行为的一种方法,它不是我的真实代码的一部分,在我的真实代码中,低级库提供了线程并在回调中调用我的代码。请注意,如果启动未在超时间隔内完成,
Start
方法将抛出TimeoutException
。我想重写此代码以使用Rx。这是我的第一次尝试:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
这是一个不错的尝试,但不幸的是,它包含了比赛条件。如果启动快速完成(例如,如果
delay
为0),并且在A点处存在附加延迟,则在执行OnNext
之前,将在readySubject
上调用First
。本质上,我正在使用IObservable
和Timeout
的First
从未看到启动已完成,而是会抛出TimeoutException
。似乎已创建
Observable.Defer
来处理此类问题。这是使用Rx的更为复杂的尝试:Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
现在,异步操作不会立即开始,而仅在使用
IObservable
时才开始。不幸的是,仍然存在争用条件,但是这次是在B点。如果开始的异步操作在OnNext
lambda返回之前调用Defer
,它仍然会丢失,并且TimeoutException
会抛出Timeout
。我知道我可以使用像
Replay
这样的运算符来缓冲事件,但是我最初没有Rx的示例并不使用任何类型的缓冲。我有没有办法在没有比赛条件的情况下使用Rx解决我的问题?本质上,仅在将IObservable
连接到Timeout
和First
之后才启动异步操作。根据保罗·贝茨(Paul Betts)的回答,这里是可行的解决方案:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
有趣的部分是C点处的延迟时间长于
AsyncStart
完成所需的时间。 AsyncSubject
保留最后发送的通知,并且Timeout
和First
仍将按预期执行。 最佳答案
因此,关于Rx的一件事,我想很多人一开始都会做(包括我自己!):如果您使用的是诸如ResetEvents,Thread.Sleeps之类的任何传统线程功能,或者您做错了(tm )-就像将内容强制转换为LINQ中的Array一样,因为您知道基础类型恰好是数组。
要知道的关键是,异步函数由返回IObservable<TResult>
的函数表示-这是神奇的调味料,可让您在完成某些操作时发出信号。因此,这就是您如何像传统的Silverlight Web服务中那样“Rx-ify”一个更传统的异步函数:
IObservable<byte[]> readFromNetwork()
{
var ret = new AsyncSubject();
// Here's a traditional async function that you provide a callback to
asyncReaderFunc(theFile, buffer => {
ret.OnNext(buffer);
ret.OnCompleted();
});
return ret;
}
这是
AsyncSubject
出现的地方-这确保即使asyncReaderFunc击败了订阅者,AsyncSubject仍将“重放”发生的事情。因此,既然我们有了函数,就可以对它做很多有趣的事情:
// Make it into a sync function
byte[] results = readFromNetwork().First();
// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})
// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
.Repeat().TakeUntil(x => x == null || x.Length == 0)
.Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
.Subscribe(ms => {
Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
});
// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
.Repeat().TakeUntil(x => x == null || x.Length == 0)
.Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
.First();