本文介绍了Rx.js 等待回调完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Rx.js 来处理文件的内容,为每一行发出一个 http 请求,然后聚合结果.但是,源文件包含数千行,并且我正在重载正在执行 http 请求的远程 http api.我需要确保在开始另一个请求之前等待现有的 http 请求回调.我愿意一次批处理和执行 n 个请求,但对于这个脚本,串行执行请求就足够了.

我有以下几点:

const fs = require('fs');const rx = require('rx');const rxNode = require('rx-node');const doHttpRequest = rx.Observable.fromCallback((params, callback) => {process.nextTick(() => {callback('http 响应');});});rxNode.fromReadableStream(fs.createReadStream('./source-file.txt')).flatMap(t => t.toString().split('
')).take(5).concatMap(t => {console.log('提交请求');返回 doHttpRequest(t);}).订阅(结果 => {控制台日志(结果);}, 错误 =>{控制台错误('错误',错误);}, () =>{console.log('完成');});

然而,这不会串行执行 http 请求.它输出:

提交请求提交请求提交请求提交请求提交请求http 响应http 响应http 响应http 响应http 响应完全的

如果我删除对 concatAll() 的调用,那么请求是串行的,但我的订阅函数在 http 请求返回之前看到了可观察值.

如何串行执行 HTTP 请求,使输出如下所示?

提交请求http 响应提交请求http 响应提交请求http 响应提交请求http 响应提交请求http 响应完全的
解决方案

这里的问题可能是当你使用 rx.Observable.fromCallback 时,你传入参数的函数会立即执行.返回的 observable 将保存稍后传递给回调的值.为了更好地了解正在发生的事情,您应该使用稍微复杂一些的模拟:对您的请求进行编号,让它们返回您可以通过订阅观察到的实际(每个请求不同)结果.

我的假设发生在这里:

  • take(5) 发出 5 个值
  • map 发出 5 个日志消息,执行 5 个函数并传递 5 个可观察对象
  • 这 5 个 observable 由 concatAll 处理,这些 observable 发出的值将按预期顺序排列.您在此处订购的是对函数调用的结果,而不是对函数本身的调用.

为了实现您的目标,您需要仅在 concatAll 订阅它时而不是在创建时调用您的可观察工厂 (rx.Observable.fromCallback).为此,您可以使用 defer :https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md

所以你的代码会变成:

rxNode.fromReadableStream(fs.createReadStream('./path-to-file')).map(t => t.toString().split('
')).flatMap(t => t).take(5).map(t => {console.log('提交请求');return Observable.defer(function(){return doHttpRequest(t);})}).concatAll().订阅(结果 => {控制台日志(结果);}, 错误 =>{控制台错误('错误',错误);}, () =>{console.log('完成');});

您可以在这里看到类似的问题,并有很好的解释:如何在 rxjs 中 *完全* 完成第一个 * 之后 * 启动第二个可观察对象

您的日志可能仍会显示 5 条连续的提交请求"消息.但是您的请求应该按照您的意愿在另一个完成后执行.

I am using Rx.js to process the contents of a file, make an http request for each line and then aggregate the results. However the source file contains thousands of lines and I am overloading the remote http api that I am performing the http request to. I need to make sure that I wait for the existing http request to callback before starting another one. I'd be open to batching and performing n requests at a time but for this script performing the requests in serial is sufficient.

I have the following:

const fs = require('fs');
const rx = require('rx');
const rxNode = require('rx-node');

const doHttpRequest = rx.Observable.fromCallback((params, callback) => {
  process.nextTick(() => {
    callback('http response');
  });
});

rxNode.fromReadableStream(fs.createReadStream('./source-file.txt'))
  .flatMap(t => t.toString().split('
'))
  .take(5)
  .concatMap(t => {
    console.log('Submitting request');

    return doHttpRequest(t);
  })
  .subscribe(results => {
    console.log(results);
  }, err => {
    console.error('Error', err);
  }, () => {
    console.log('Completed');
  });

However this does not perform the http requests in serial. It outputs:

Submitting request
Submitting request
Submitting request
Submitting request
Submitting request
http response
http response
http response
http response
http response
Completed

If I remove the call to concatAll() then the requests are in serial but my subscribe function is seeing the observables before the http requests have returned.

How can I perform the HTTP requests serially so that the output is as below?

Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Completed
解决方案

The problem here is probably that when you use rx.Observable.fromCallback, the function you passed in argument is executed immediately. The observable returned will hold the value passed to the callback at a later point in time. To have a better view of what is happening, you should use a slightly more complex simulation : number your requests, have them return an actual (different for each request) result that you can observe through the subscription.

What I posit happens here :

  • take(5) issues 5 values
  • map issues 5 log messages, executes 5 functions and passes on 5 observables
  • those 5 observables are handled by concatAll and the values issued by those observables will be in order as expected. What you are ordering here is the result of the call to the functions, not the calls to the functions themselves.

To achieve your aim, you need to call your observable factory (rx.Observable.fromCallback) only when concatAll subscribes to it and not at creation time. For that you can use defer : https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md

So your code would turn into :

rxNode.fromReadableStream(fs.createReadStream('./path-to-file'))
  .map(t => t.toString().split('
'))
  .flatMap(t => t)
  .take(5)
  .map(t => {
    console.log('Submitting request');

    return Observable.defer(function(){return doHttpRequest(t);})
  })
  .concatAll()
  .subscribe(results => {
    console.log(results);
  }, err => {
    console.error('Error', err);
  }, () => {
    console.log('Completed');
  });

You can see a similar issue with an excellent explanation here : How to start second observable *only* after first is *completely* done in rxjs

Your log is likely to still show 5 consecutive 'Submitting request' messages. But your request should be executed one after the other has completed as you wish.

这篇关于Rx.js 等待回调完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-16 07:03