我有一个简单的Web API,它返回一个Iobservable。我正在使用HttpClient获取Observable,以便可以订阅它。我的问题是订阅返回的Iobservable返回了“空”结果。

服务器

public IObservable<DataItem> GetDataItems()
{
    return Observable.Generate(0, i => i < 10, i => i + 1,
        i => new DataItem
        {
            Id = i,
            Name = String.Format("Storage{0}",i)
        });
}

客户

public IObservable<DataItem> GetDataItems()
{
    using (HttpClient apiClient = new HttpClient())
    {
        apiClient.BaseAddress = new Uri("http://localhost:9001");
        apiClient.DefaultRequestHeaders.Add("x-user-authentication", "xxxxxx");
        return apiClient
            .GetAsync("api/xxxx/yyyy").Result.Content
            .ReadAsAsync<DataItem>().ToObservable();
    }
}

var source = GetDataItems();

List<DataItem> items = new List<DataItem>();

IDisposable consoleSubscription = source.Subscribe(
            x => Console.WriteLine("{0}:{1}", x.Id, x.Name),
            ex => Console.WriteLine("OnError : {0} ", ex.Message),
            () =>  Console.WriteLine("Encountered End of Stream")
            );
        consoleSubscription.Dispose();

我的问题是我没有从服务器获取任何数据。我得到一个“空”的观察。我针对 Controller 编写了一个单元测试,它确实返回了数据项。

任何建议请帮忙。无法理解我要去哪里。服务器或客户端上没有错误。

最佳答案

您有点野心勃勃,希望IObservable<T>自动通过网络流式传输。恐怕WebAPI不会为您做到这一点。

您所看到的是默认json序列化程序输出IObservable<T>属性的结果-并没有,因此您得到了空括号。

您的单元测试之所以有效,是因为它全部在内存中-不会进行序列化/反序列化。

有多种方法可以使用HttpResponseMessage的StreamContent属性来输出结果,您可以将这些结果桥接到IObservable<T>或从IObservable<T>桥接-但这并不是真正的惯用WebApi。 WebApi的异步支持实际上是针对服务器上具有单项响应的请求的异步处理,而不是返回连续流事件。

最重要的是,我认为WebApi(至少在撰写本文时)是这里的错误技术选择。您最好看看SignalR,它是为这种情况而专门设计的,并包含在当前的ASP.NET版本中。它同时具有javascript和.NET客户端支持,您可以相当轻松地桥接到IObservable。已经有人对此进行了研究,例如in this post sporting example code

一些消息传递中间件,例如my-Channels Nirvana(编辑:自从Terracotta收购并包装到Universal Messaging中以来。在他们的文档中提供了一些example code。),以及CEP解决方案(例如SQL Server StreamInsight)也都具有现成的ojit_code支持。

关于asp.net-web-api - 如何从Web API获取IObservable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/20230561/

10-11 02:19