本文介绍了Rx.js等待回调完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Rx.js处理文件的内容,为每一行发出http请求,然后聚合结果。但是源文件包含数千行,我正在重载我正在执行http请求的远程http api。我需要确保在开始另一个http请求之前等待现有的http请求回调。我愿意一次批处理并执行 n 请求,但是对于这个脚本执行请求的脚本就足够了。

I am using Rx.js to process the contents of a file, make an http request for each line and then aggregate the results. However the source file contains thousands of lines and I am overloading the remote http api that I am performing the http request to. I need to make sure that I wait for the existing http request to callback before starting another one. I'd be open to batching and performing n requests at a time but for this script performing the requests in serial is sufficient.

我有以下内容:

const fs = require('fs');
const rx = require('rx');
const rxNode = require('rx-node');

const doHttpRequest = rx.Observable.fromCallback((params, callback) => {
  process.nextTick(() => {
    callback('http response');
  });
});

rxNode.fromReadableStream(fs.createReadStream('./source-file.txt'))
  .flatMap(t => t.toString().split('\r\n'))
  .take(5)
  .concatMap(t => {
    console.log('Submitting request');

    return doHttpRequest(t);
  })
  .subscribe(results => {
    console.log(results);
  }, err => {
    console.error('Error', err);
  }, () => {
    console.log('Completed');
  });

但是这不会串行执行http请求。它输出:

However this does not perform the http requests in serial. It outputs:


Submitting request
Submitting request
Submitting request
Submitting request
Submitting request
http response
http response
http response
http response
http response
Completed

如果我删除对 concatAll()的调用,那么请求是串行的,但我的订阅函数在http请求返回之前看到了observable 。

If I remove the call to concatAll() then the requests are in serial but my subscribe function is seeing the observables before the http requests have returned.

如何连续执行HTTP请求,以便输出如下?

How can I perform the HTTP requests serially so that the output is as below?


Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Completed


推荐答案

这里的问题可能是当你使用 rx.Observable.fromCallback 时,你在参数中传递的函数是e立即行动起来。返回的observable将保留稍后传递给回调的值。为了更好地了解正在发生的事情,您应该使用稍微复杂的模拟:为您的请求编号,让它们返回您可以通过订阅观察到的实际(每个请求不同)结果。

The problem here is probably that when you use rx.Observable.fromCallback, the function you passed in argument is executed immediately. The observable returned will hold the value passed to the callback at a later point in time. To have a better view of what is happening, you should use a slightly more complex simulation : number your requests, have them return an actual (different for each request) result that you can observe through the subscription.

我在这里发生了什么:


  • take(5)发布5个值

  • map 发出5条日志消息,执行5个函数并传递5个observables

  • 这5个可观察量由 concatAll 处理,这些可观察量发出的值将按预期顺序排列。你在这里订购的是调用函数的结果,而不是对函数本身的调用。

  • take(5) issues 5 values
  • map issues 5 log messages, executes 5 functions and passes on 5 observables
  • those 5 observables are handled by concatAll and the values issued by those observables will be in order as expected. What you are ordering here is the result of the call to the functions, not the calls to the functions themselves.

实现你的目标,只有当 concatAll 订阅它时才需要调用你的observable工厂( rx.Observable.fromCallback )创作时间。为此你可以使用 defer

To achieve your aim, you need to call your observable factory (rx.Observable.fromCallback) only when concatAll subscribes to it and not at creation time. For that you can use defer : https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md

所以你的代码会变成:

rxNode.fromReadableStream(fs.createReadStream('./path-to-file'))
  .map(t => t.toString().split('\r\n'))
  .flatMap(t => t)
  .take(5)
  .map(t => {
    console.log('Submitting request');

    return Observable.defer(function(){return doHttpRequest(t);})
  })
  .concatAll()
  .subscribe(results => {
    console.log(results);
  }, err => {
    console.error('Error', err);
  }, () => {
    console.log('Completed');
  });

您可以在此处看到类似问题,并提供了一个很好的解释:

You can see a similar issue with an excellent explanation here : How to start second observable *only* after first is *completely* done in rxjs

您的日志可能仍会连续显示5条提交请求消息。但是你的请求应该按照你的意愿一个接一个地执行。

Your log is likely to still show 5 consecutive 'Submitting request' messages. But your request should be executed one after the other has completed as you wish.

这篇关于Rx.js等待回调完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-16 07:03