我正在建立一个从后台读取数据的网站。该数据是动态计算的,并以缓冲方式发送回客户端。也就是说,一旦计算出第一个块,它就被发送到客户端,然后它计算下一个块并将其发送到客户端。整个过程发生在同一个http请求中。客户机不应该等待完整的响应完成,而应该在发送后立即自行处理每个块。这些响应通常可以使用xhr进程处理程序(例如How to get progress from XMLHttpRequest)来使用。
我如何使用RXJS和Observables使用Angular2中的httpmodule使用这样的响应?
编辑:Peeskillet在下面给出了一个出色而详细的答案。此外,我做了进一步的挖掘,发现了afeature request for the HttpModule of Angular和aStackOverflow question with another approach on how to solve it

最佳答案

注:以下答案仅为POC。它的目的是教育HTTP的体系结构,并提供一个简单的工作PoC实现。我们应该查看XHRConnection的源代码,了解在实现此功能时还应该考虑什么。
在尝试实现这个时,我看不到任何直接进入xhr的方法。看来我们可能需要提供使用Http所涉及的一些组件的自定义实现。我们应该考虑的三个主要部分是
Connection
ConnectionBackend
Http
HttpConnectionBackend作为其构造函数的参数。当发出请求时,例如使用getHttp创建与ConnectionBackend.createConnection的连接,并返回ObservableConnection属性(从createConnection返回)。在最简化的视图中,它看起来像这样。

class XHRConnection implements Connection {
  response: Observable<Response>;
  constructor( request, browserXhr) {
    this.response = new Observable((observer: Observer<Response>) => {
      let xhr = browserXhr.create();
      let onLoad = (..) => {
        observer.next(new Response(...));
      };
      xhr.addEventListener('load', onLoad);
    })
  }
}

class XHRBackend implements ConnectionBackend {
  constructor(private browserXhr) {}
  createConnection(request): XHRConnection {
    return new XHRConnection(request, this.broswerXhr).response;
  }
}

class Http {
  constructor(private backend: ConnectionBackend) {}

  get(url, options): Observable<Response> {
    return this.backend.createConnection(createRequest(url, options)).response;
  }
}

因此,了解这个架构,我们可以尝试实现类似的东西。
对于Connection,这里是poc。为了简洁起见,没有导入,但在大多数情况下,所有内容都可以从@angular/http导入,Observable/Observer可以从rxjs/{Type}导入。
export class Chunk {
  data: string;
}

export class ChunkedXHRConnection implements Connection {
  request: Request;
  response: Observable<Response>;
  readyState: ReadyState;

  chunks: Observable<Chunk>;

  constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) {
    this.request = req;
    this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => {
      let _xhr: XMLHttpRequest = browserXHR.build();
      let previousLen = 0;
      let onProgress = (progress: ProgressEvent) => {
        let text = _xhr.responseText;
        text = text.substring(previousLen);
        chunkObserver.next({ data: text });
        previousLen += text.length;

        console.log(`chunk data: ${text}`);
      };
      _xhr.addEventListener('progress', onProgress);
      _xhr.open(RequestMethod[req.method].toUpperCase(), req.url);
      _xhr.send(this.request.getBody());
      return () => {
        _xhr.removeEventListener('progress', onProgress);
        _xhr.abort();
      };
    });
  }
}

这是我们刚刚订阅的xhrprogress事件。由于XHR.responseText将整个连接的文本输出,我们只需substring获取块,并通过Observer发出每个chuck。
对于XHRBackend,我们有以下内容(没有什么特别之处)。同样,一切都可以从@angular/http导入;
@Injectable()
export class ChunkedXHRBackend implements ConnectionBackend {
  constructor(
      private _browserXHR: BrowserXhr, private _baseResponseOptions: ResponseOptions,
      private _xsrfStrategy: XSRFStrategy) {}

  createConnection(request: Request): ChunkedXHRConnection {
    this._xsrfStrategy.configureRequest(request);
    return new ChunkedXHRConnection(request, this._browserXHR, this._baseResponseOptions);
  }
}

对于Http,我们将对其进行扩展,添加一个getChunks方法。如果需要,可以添加更多方法。
@Injectable()
export class ChunkedHttp extends Http {
  constructor(protected backend: ChunkedXHRBackend, protected defaultOptions: RequestOptions) {
    super(backend, defaultOptions);
  }

  getChunks(url, options?: RequestOptionsArgs): Observable<Chunk> {
    return this.backend.createConnection(
       new Request(mergeOptions(this.defaultOptions, options, RequestMethod.Get, url))).chunks;
  }
}

mergeOptions方法可以在Http source中找到。
现在我们可以为它创建一个模块。用户应该直接使用ChunkedHttp而不是Http。但是,由于不试图覆盖Http标记,如果需要,您仍然可以使用Http
@NgModule({
  imports: [ HttpModule ],
  providers: [
    {
      provide: ChunkedHttp,
      useFactory: (backend: ChunkedXHRBackend, options: RequestOptions) => {
        return new ChunkedHttp(backend, options);
      },
      deps: [ ChunkedXHRBackend, RequestOptions ]
    },
    ChunkedXHRBackend
  ]
})
export class ChunkedHttpModule {
}

我们导入了cc,因为它提供了我们需要注入的其他服务,但是如果我们不需要的话,我们不想取消这些服务。
只需将HttpModule导入ChunkedHttpModule。为了测试我使用了以下组件
@Component({
  selector: 'app',
  encapsulation: ViewEncapsulation.None,
  template: `
    <button (click)="onClick()">Click Me!</button>
    <h4 *ngFor="let chunk of chunks">{{ chunk }}</h4>
  `,
  styleUrls: ['./app.style.css']
})
export class App {
  chunks: string[] = [];

  constructor(private http: ChunkedHttp) {}

  onClick() {
    this.http.getChunks('http://localhost:8080/api/resource')
      .subscribe(chunk => this.chunks.push(chunk.data));
  }
}

我有一个后端端点设置,它每秒钟只吐出10个块中的“cc>”。这就是结果
angular - 使用Angular2/RxJS读取缓冲的响应-LMLPHP
好像什么地方有虫子。只有九个:-)。我认为这与服务器端有关。

10-05 20:54
查看更多