Angular 7 docs提供了rxjs Observable s在实现AJAX请求的指数补偿时的实际用法示例:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

虽然我了解了Observable和backoff的概念,但我还不太清楚,retryWhen将如何精确计算出重新订阅源ajax的时间间隔。

具体来说, zip map mapMerge 在此设置中如何工作?

attempts对象发射到retryWhen中时,它将包含什么?

我浏览了他们的引用页,但仍然无法解决这个问题。

最佳答案

我花了很多时间对此进行研究(出于学习目的),并将尝试尽可能全面地解释此代码的工作原理。

首先,这是原始代码,带注释:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {                  // (1)
 return pipe(                                     // (2)
   retryWhen(attempts => range(1, maxTries)       // (3)
     .pipe(
       zip(attempts, (i) => i),                   // (4)
       map(i => i * i),                           // (5)
       mergeMap(i =>  timer(i * ms))              // (6)
     )
   )
 );                                               // (7)
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}
  • 很简单,我们在backoff运算符之外创建了自定义retryWhen运算符。我们稍后将可以在pipe函数中应用此功能。
  • 在这种情况下,pipe方法返回一个自定义运算符。
  • 我们的自定义运算符将是修改后的retryWhen运算符。它带有一个函数参数。该函数将被调用一次-特别是在首次遇到/调用此retryWhen时。顺便说一下,retryWhen仅在可观察的源产生错误时才起作用。然后,它可以防止错误进一步传播并重新订阅源。如果源产生非错误结果(无论是第一次订阅还是重试),retryWhen将被传递并且不涉及。

    关于attempts的几句话。这是可以观察的。这不是可观察到的源。它是专门为retryWhen创建的。它只有一种用途:只有可观察到的对源可观察项的订阅(或重新订阅)导致错误时,attempts会触发next。我们获得了attempts并可以自由使用它,以便以某种方式对可观察到的源订阅的每次失败尝试使用react。

    这就是我们要做的。

    首先,我们创建range(1, maxTries),它是一个可观察的对象,对于我们愿意执行的每次重试,它都有一个整数。 range准备随时随地发射所有数字,但是我们必须坚守一道:只有在发生其他重试时,我们才需要一个新数字。因此,这就是我们...
  • ...用attempts压缩它。含义,将每个attempts的发射值与单个range值相结合。

    请记住,我们当前使用的函数将仅被调用一次,到那时,对于最初的失败订阅,attempts将仅触发一次next。因此,在这一点上,我们的两个压缩观测值仅产生了一个值。

    顺便说一句,将两个可观察值压缩成一个的值是多少?该函数确定:(i) => i。为了清楚起见,可以将其编写为(itemFromRange, itemFromAttempts) => itemFromRange。第二个参数未使用,因此被删除,第一个被重命名为i

    这里发生的是,我们只是无视attempts触发的值,我们只对它们被触发的事实感兴趣。每当发生这种情况时,我们都会从range observable中提取下一个值...
  • ...并将其平方。这是用于指数补偿的指数部分。

    因此,现在无论何时(重新)订阅源失败,我们手中的整数都会不断增加(1、4、9、16 ...)。我们如何将该整数转换为时间延迟,直到下一次重新订阅?

    记住,我们当前内置的此函数必须使用attempts作为输入返回一个可观察值。此结果可观察到的对象仅构建一次。然后retryWhen订阅该结果可观察到的内容;并且:每当结果可观察到的结果触发next时,重试订阅源可观察的内容;每当可观察到的结果触发那些相应事件时,在可观察到的源上调用completeerror
  • 长话短说,我们需要让retryWhen等待一段时间。可以使用 delay 运算符,但是设置延迟的指数增长可能会很痛苦。而是mergeMap运算符起作用。
    mergeMap是组合了两个运算符的快捷方式:mapmergeAllmap只是将每个递增的整数(1、4、9、16 ...)转换为可观察到的timer,它会在经过毫秒数后触发nextmergeAll强制retryWhen实际订阅timer。如果最后一点没有发生,我们得到的observable将立即以next observable实例作为值触发timer
  • 至此,我们已经构建了自定义的Observable,retryWhen将使用它来决定何时确切尝试重新订阅源Observable。

  • 就目前而言,我看到此实现存在两个问题:
  • 一旦我们生成的可观察对象触发了最后一个next(导致了最后一次尝试重新订阅),它也会立即触发complete。除非源可观察到的返回结果非常迅速(假设最后一次重试将是成功的),否则该结果将被忽略。

    这是因为retryWhen一旦从我们的观察对象中听到complete,就会在源代码上调用complete,这可能仍在发出AJAX请求的过程中。
  • 如果所有重试均未成功,则源实际上会调用complete而不是更合乎逻辑的error

  • 为了解决这两个问题,我认为在给最后一次重试提供一些合理的时间来尝试完成其工作之后,最终得到的observable应该在最后触发error

    这是我对上述修复程序的实现,其中还考虑了在最新的zip中弃用rxjs v6运算符:
    import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
    import { concat, pipe, range, throwError, timer, zip } from "rxjs";
    
    function backoffImproved(maxTries, ms) {
        return pipe(
            retryWhen(attempts => {
                const observableForRetries =
                    zip(range(1, maxTries), attempts)
                        .pipe(
                            map(([elemFromRange, elemFromAttempts]) => elemFromRange),
                            map(i => i * i),
                            switchMap(i => timer(i * ms))
                        );
                const observableForFailure =
                    throwError(new Error('Could not complete AJAX request'))
                        .pipe(
                            materialize(),
                            delay(1000),
                            dematerialize()
                        );
                return concat(observableForRetries, observableForFailure);
            })
        );
    }
    

    我测试了这段代码,它似乎在所有情况下都能正常工作。我现在不愿意详细解释它;我怀疑有人还会读上面的文字墙。

    无论如何,非常感谢@BenjaminGruenbaum和@cartant将我设置在正确的路径上,以解决所有这些问题。

    关于javascript - 使用rxjs实现指数补偿,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53015170/

    10-10 02:01