我正在建立一个从后台读取数据的网站。该数据是动态计算的,并以缓冲方式发送回客户端。也就是说,一旦计算出第一个块,它就被发送到客户端,然后它计算下一个块并将其发送到客户端。整个过程发生在同一个http请求中。客户机不应该等待完整的响应完成,而应该在发送后立即自行处理每个块。这些响应通常可以使用xhr进程处理程序(例如How to get progress from XMLHttpRequest)来使用。
我如何使用RXJS和Observables使用Angular2中的httpmodule使用这样的响应?
编辑:Peeskillet在下面给出了一个出色而详细的答案。此外,我做了进一步的挖掘,发现了afeature request for the HttpModule
of Angular和aStackOverflow question with another approach on how to solve it。
最佳答案
注:以下答案仅为POC。它的目的是教育HTTP的体系结构,并提供一个简单的工作PoC实现。我们应该查看XHRConnection
的源代码,了解在实现此功能时还应该考虑什么。
在尝试实现这个时,我看不到任何直接进入xhr的方法。看来我们可能需要提供使用Http
所涉及的一些组件的自定义实现。我们应该考虑的三个主要部分是Connection
ConnectionBackend
Http
Http
将ConnectionBackend
作为其构造函数的参数。当发出请求时,例如使用get
,Http
创建与ConnectionBackend.createConnection
的连接,并返回Observable
的Connection
属性(从createConnection
返回)。在最简化的视图中,它看起来像这样。
class XHRConnection implements Connection {
response: Observable<Response>;
constructor( request, browserXhr) {
this.response = new Observable((observer: Observer<Response>) => {
let xhr = browserXhr.create();
let onLoad = (..) => {
observer.next(new Response(...));
};
xhr.addEventListener('load', onLoad);
})
}
}
class XHRBackend implements ConnectionBackend {
constructor(private browserXhr) {}
createConnection(request): XHRConnection {
return new XHRConnection(request, this.broswerXhr).response;
}
}
class Http {
constructor(private backend: ConnectionBackend) {}
get(url, options): Observable<Response> {
return this.backend.createConnection(createRequest(url, options)).response;
}
}
因此,了解这个架构,我们可以尝试实现类似的东西。
对于
Connection
,这里是poc。为了简洁起见,没有导入,但在大多数情况下,所有内容都可以从@angular/http
导入,Observable/Observer
可以从rxjs/{Type}
导入。export class Chunk {
data: string;
}
export class ChunkedXHRConnection implements Connection {
request: Request;
response: Observable<Response>;
readyState: ReadyState;
chunks: Observable<Chunk>;
constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) {
this.request = req;
this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => {
let _xhr: XMLHttpRequest = browserXHR.build();
let previousLen = 0;
let onProgress = (progress: ProgressEvent) => {
let text = _xhr.responseText;
text = text.substring(previousLen);
chunkObserver.next({ data: text });
previousLen += text.length;
console.log(`chunk data: ${text}`);
};
_xhr.addEventListener('progress', onProgress);
_xhr.open(RequestMethod[req.method].toUpperCase(), req.url);
_xhr.send(this.request.getBody());
return () => {
_xhr.removeEventListener('progress', onProgress);
_xhr.abort();
};
});
}
}
这是我们刚刚订阅的xhr
progress
事件。由于XHR.responseText
将整个连接的文本输出,我们只需substring
获取块,并通过Observer
发出每个chuck。对于
XHRBackend
,我们有以下内容(没有什么特别之处)。同样,一切都可以从@angular/http
导入;@Injectable()
export class ChunkedXHRBackend implements ConnectionBackend {
constructor(
private _browserXHR: BrowserXhr, private _baseResponseOptions: ResponseOptions,
private _xsrfStrategy: XSRFStrategy) {}
createConnection(request: Request): ChunkedXHRConnection {
this._xsrfStrategy.configureRequest(request);
return new ChunkedXHRConnection(request, this._browserXHR, this._baseResponseOptions);
}
}
对于
Http
,我们将对其进行扩展,添加一个getChunks
方法。如果需要,可以添加更多方法。@Injectable()
export class ChunkedHttp extends Http {
constructor(protected backend: ChunkedXHRBackend, protected defaultOptions: RequestOptions) {
super(backend, defaultOptions);
}
getChunks(url, options?: RequestOptionsArgs): Observable<Chunk> {
return this.backend.createConnection(
new Request(mergeOptions(this.defaultOptions, options, RequestMethod.Get, url))).chunks;
}
}
mergeOptions
方法可以在Http
source中找到。现在我们可以为它创建一个模块。用户应该直接使用
ChunkedHttp
而不是Http
。但是,由于不试图覆盖Http
标记,如果需要,您仍然可以使用Http
。@NgModule({
imports: [ HttpModule ],
providers: [
{
provide: ChunkedHttp,
useFactory: (backend: ChunkedXHRBackend, options: RequestOptions) => {
return new ChunkedHttp(backend, options);
},
deps: [ ChunkedXHRBackend, RequestOptions ]
},
ChunkedXHRBackend
]
})
export class ChunkedHttpModule {
}
我们导入了cc,因为它提供了我们需要注入的其他服务,但是如果我们不需要的话,我们不想取消这些服务。
只需将
HttpModule
导入ChunkedHttpModule
。为了测试我使用了以下组件@Component({
selector: 'app',
encapsulation: ViewEncapsulation.None,
template: `
<button (click)="onClick()">Click Me!</button>
<h4 *ngFor="let chunk of chunks">{{ chunk }}</h4>
`,
styleUrls: ['./app.style.css']
})
export class App {
chunks: string[] = [];
constructor(private http: ChunkedHttp) {}
onClick() {
this.http.getChunks('http://localhost:8080/api/resource')
.subscribe(chunk => this.chunks.push(chunk.data));
}
}
我有一个后端端点设置,它每秒钟只吐出10个块中的“cc>”。这就是结果
好像什么地方有虫子。只有九个:-)。我认为这与服务器端有关。