我正在从我们的 ASP.NET 站点中删除我们的电子邮件系统,该站点过去常常使用系统立即发送电子邮件以处理单独服务中的请求,以减少网站上的工作量。我正在尝试围绕一组接口(interface)设计它,以便我可以根据需要交换实现,但最初它将基于消息队列 (MSMQ) 将请求发送到队列,让服务接收传入的请求然后处理它们。我目前大致定义了以下接口(interface):
// Sends one or more requests to be processed somehow
public interface IRequestSender
{
void Send(IEnumerable<Request> requests);
}
// Listens for incoming requests and passes them to an observer to do the real work
public interface IRequestListener : IObservable<Request>
{
void Start();
void Stop();
}
// Processes a request given to it by a IRequestListener
public interface IRequestProcessor : IObserver<Request>
{
}
您会注意到 Listener 和 Processor 使用 observable 模式,因为我认为这似乎最适合。
我的问题是弄清楚如何编写从 MSMQ 接收的
IRequestListener
的实现,基本上如何创建合适的 IObservable<T>
?我发现的第一个选择是根据 MSDN documentation 给出的示例从头开始创建一个
IObservable<T>
,但这似乎有很多管道工作要做。另一种选择是使用 Reactive Extensions,因为它似乎旨在使创建 observables 更容易。我发现最接近使用 Rx 和 MSMQ 的是这些页面:
但我不确定如何将这些示例应用于我的
IRequestListener
接口(interface)。也欢迎任何其他想法,如果合适,甚至可以更改我的基本设计。
最佳答案
我最初确实使用了 FromAsyncPattern,但后来为它编写了一个类,因为它可以更好地处理超时和中毒消息。一旦开始,队列无论如何都是热门的 Observable。您还可以使用 Observable.Defer
使其更接近 Rx 而不是 Start/Stop。
这是 QueueObservable 的基本实现。您可以从调用 ListenReceive
开始。
Subject<T> Subject = new Subject<T>();
protected void ListenReceive()
{
Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive);
}
protected void OnReceive(IAsyncResult ar)
{
Message message = null;
try
{
message = Queue.EndReceive(ar);
}
catch (TimeoutException ex)
{
//retry?
}
if (message != null)
Subject.OnNext((T) message.Body);
Thread.Yield();
if (!IsDisposed)
ListenReceive();
}
public IObservable<T> AsObservable()
{
return Subject;
}
关于.net - 如何创建从 MSMQ 消息队列读取的 IObservable<T>?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/9191410/