原文:Comprehensive Guide to Higher-Order RxJs Mapping Operators: switchMap, mergeMap, concatMap (and exhaustMap)

我们日常发现的一些最常用的 RxJs 操作符是 RxJs 高阶映射操作符:switchMap、mergeMap、concatMap 和exhaustMap。

例如,我们程序中的大部分网络调用都将使用这些运算符之一完成,因此熟悉它们对于编写几乎所有反应式程序至关重要。

知道在给定情况下使用哪个运算符(以及为什么)可能有点令人困惑,我们经常想知道这些运算符是如何真正工作的,以及为什么它们会这样命名。

这些运算符可能看起来不相关,但我们真的很想一口气学习它们,因为选择错误的运算符可能会意外地导致我们程序中的微妙问题。

Why are the mapping operators a bit confusing?

这样做是有原因的:为了理解这些操作符,我们首先需要了解每个内部使用的 Observable 组合策略。

与其试图自己理解switchMap,不如先了解什么是Observable切换; 我们需要先学习 Observable 连接等,而不是直接深入 concatMap。

这就是我们在这篇文章中要做的事情,我们将按逻辑顺序学习 concat、merge、switch 和exhaust 策略及其对应的映射运算符:concatMap、mergeMap、switchMap 和exhaustMap。

我们将结合使用 marble 图和一些实际示例(包括运行代码)来解释这些概念。

最后,您将确切地知道这些映射运算符中的每一个是如何工作的,何时使用,为什么使用,以及它们名称的原因。

The RxJs Map Operator

让我们从头开始,介绍这些映射运算符的一般作用。

正如运算符的名称所暗示的那样,他们正在做某种映射:但究竟是什么被映射了? 我们先来看看 RxJs Map 操作符的弹珠图:

How the base Map Operator works

使用 map 运算符,我们可以获取输入流(值为 1、2、3),并从中创建派生的映射输出流(值为 10、20、30)。

底部输出流的值是通过获取输入流的值并将它们应用到一个函数来获得的:这个函数只是将这些值乘以 10。

所以 map 操作符就是映射输入 observable 的值。 以下是我们如何使用它来处理 HTTP 请求的示例:

const http$ : Observable<Course[]> = this.http.get('/api/courses');

http$
    .pipe(
        tap(() => console.log('HTTP request executed')),
        map(res => Object.values(res['payload']))
    )
    .subscribe(
        courses => console.log("courses", courses)
    );

在这个例子中,我们正在创建一个 HTTP observable 来进行后端调用,我们正在订阅它。 observable 将发出后端 HTTP 响应的值,它是一个 JSON 对象。

在这种情况下,HTTP 响应将数据包装在有效负载属性中,因此为了获取数据,我们应用了 RxJs 映射运算符。 然后映射函数将映射 JSON 响应负载并提取负载属性的值。

既然我们已经回顾了基本映射的工作原理,现在让我们来谈谈高阶映射。

What is Higher-Order Observable Mapping?

在高阶映射中,我们不是将像 1 这样的普通值映射到另一个像 10 这样的值,而是将一个值映射到一个 Observable 中!

结果是一个高阶的 Observable。 它只是一个 Observable,但它的值本身也是 Observable,我们可以单独订阅。

这听起来可能有些牵强,但实际上,这种类型的映射一直在发生。 让我们举一个这种类型映射的实际例子。 假设例如,我们有一个 Angular Reactive Form,它通过 Observable 随时间发出有效的表单值:

@Component({
    selector: 'course-dialog',
    templateUrl: './course-dialog.component.html'
})
export class CourseDialogComponent implements AfterViewInit {

    form: FormGroup;
    course:Course;

    @ViewChild('saveButton') saveButton: ElementRef;

    constructor(
        private fb: FormBuilder,
        private dialogRef: MatDialogRef<CourseDialogComponent>,
        @Inject(MAT_DIALOG_DATA) course:Course) {

        this.course = course;

        this.form = fb.group({
            description: [course.description,
                          Validators.required],
            category: [course.category, Validators.required],
            releasedAt: [moment(), Validators.required],
            longDescription: [course.longDescription,
                              Validators.required]
        });
    }
}

Reactive Form 提供了一个 Observable this.form.valueChanges,它在用户与表单交互时发出最新的表单值。 这将是我们的源 Observable。

我们想要做的是在这些值随着时间的推移发出时至少保存其中一些值,以实现表单草稿预保存功能。 这样,随着用户填写表单,数据会逐渐保存,从而避免由于意外重新加载而丢失整个表单数据。

Why Higher-Order Observables?

为了实现表单草稿保存功能,我们需要获取表单值,然后创建第二个执行后端保存的 HTTP observable,然后订阅它。

我们可以尝试手动完成所有这些,但是我们会陷入嵌套的订阅反模式:

this.form.valueChanges
    .subscribe(
       formValue => {

           const httpPost$ =
                 this.http.put(`/api/course/${courseId}`, formValue);

           httpPost$.subscribe(
               res => ... handle successful save ...
               err => ... handle save error ...
           );

       }
    );

正如我们所见,这会导致我们的代码很快在多个级别嵌套,这是我们在使用 RxJs 时首先要避免的问题之一。

让我们称这个新的 httpPost$ Observable 为内部 Observable,因为它是在内部嵌套代码块中创建的。

Avoiding nested subscriptions

我们希望以更方便的方式完成所有这些过程:我们希望获取表单值,并将其映射到保存 Observable 中。 这将有效地创建一个高阶 Observable,其中每个值对应一个保存请求。

然后我们希望透明地订阅这些网络 Observable 中的每一个,并且一次性直接接收网络响应,以避免任何嵌套。

如果我们有某种更高阶的 RxJs 映射运算符,我们就可以做到这一切! 那为什么我们需要四个不同的操作符呢?

为了理解这一点,想象一下如果 valueChanges observable 快速连续发出多个表单值并且保存操作需要一些时间来完成,会发生什么情况:

  • 我们应该等待一个保存请求完成后再进行另一次保存吗?
  • 我们应该并行进行多次保存吗?
  • 我们应该取消正在进行的保存并开始新的保存吗?
  • 当一个已经在进行中时,我们应该忽略新的保存尝试吗?

在探索这些用例中的每一个之前,让我们回到上面的嵌套订阅代码。

在嵌套订阅示例中,我们实际上是并行触发保存操作,这不是我们想要的,因为没有强有力的保证后端将按顺序处理保存,并且最后一个有效的表单值确实是存储在 后端。

让我们看看如何确保仅在上一次保存完成后才完成保存请求。

Understanding Observable Concatenation

为了实现顺序保存,我们将引入 Observable 连接的新概念。 在此代码示例中,我们使用 concat() RxJs 函数连接两个示例 observable:

const series1$ = of('a', 'b');

const series2$ = of('x', 'y');

const result$ = concat(series1$, series2$);

result$.subscribe(console.log);

在使用 of 创建函数创建了两个 Observables series1$ 和 series2$ 之后,我们创建了第三个 result$ Observable,它是串联 series1$ 和 series2$ 的结果。

这是该程序的控制台输出,显示了结果 Observable 发出的值:

a
b
x
y

如我们所见,这些值是将 series1$ 的值与 series2$ 的值连接在一起的结果。 但这里有一个问题:这个例子能工作的原因是因为这些 Observable 正在完成!!

of() 函数将创建 Observables,它发出传递给 of() 的值,然后在发出所有值后完成 Observables。

Observable Concatenation Marble Diagram

你注意到第一个 Observable 的值 b 后面的竖线了吗?这标志着第一个具有值 a 和 b (series1$) 的 Observable 完成的时间点。

让我们按照时间表逐步分解这里发生的事情:

  • 两个 Observables series1$ 和 series2$ 被传递给 concat() 函数
  • concat() 然后将订阅第一个 Observable series1$,但不会订阅第二个 Observable series2$(这对于理解串联至关重要)
  • source1$ 发出值 a,该值立即反映在输出 result$ Observable 中
  • 注意 source2$ Observable 还没有发出值,因为它还没有被订阅
  • 然后 source1$ 将发出 b 值,该值反映在输出中
  • 然后 source1$ 将完成,只有在此之后 concat() 现在订阅 source2$
  • 然后 source2$ 值将开始反映在输出中,直到 source2$ 完成
  • 当 source2$ 完成时, result$ Observable 也将完成
  • 请注意,我们可以将任意数量的 Observable 传递给 concat(),而不仅仅是本示例中的两个

The key point about Observable Concatenation

正如我们所看到的,Observable 连接就是关于 Observable 的完成! 我们取第一个 Observable 并使用它的值,等待它完成,然后我们使用下一个 Observable,依此类推,直到所有 Observable 完成。

回到我们的高阶 Observable 映射示例,让我们看看串联的概念如何帮助我们。

Using Observable Concatenation to implement sequential saves

正如我们所见,为了确保我们的表单值按顺序保存,我们需要获取每个表单值并将其映射到 httpPost$ Observable。

然后我们需要订阅它,但我们希望在订阅下一个 httpPost$ Observable 之前完成保存。

In order to ensure sequentiality, we need to concatenate the multiple httpPost$ Observables together!

然后我们将订阅每个 httpPost$ 并按顺序处理每个请求的结果。 最后,我们需要的是一个混合了以下内容的运算符:

  • 一个高阶映射操作(获取表单值并将其转换为 httpPost$ Observable)
  • 使用 concat() 操作,将多个 httpPost$ Observables 连接在一起以确保在前一个正在进行的保存首先完成之前不会进行下一个 HTTP 保存。

我们需要的是恰当命名的 RxJs concatMap Operator,它混合了高阶映射和 Observable 连接。

The RxJs concatMap Operator

代码如下:

this.form.valueChanges
    .pipe(
        concatMap(formValue => this.http.put(`/api/course/${courseId}`,
                                             formValue))
    )
    .subscribe(
       saveResult =>  ... handle successful save ...,
        err => ... handle save error ...
    );

正如我们所见,使用像 concatMap 这样的高阶映射运算符的第一个好处是现在我们不再有嵌套订阅。

通过使用 concatMap,现在所有表单值都将按顺序发送到后端,如 Chrome DevTools Network 选项卡中所示:

Breaking down the concatMap network log diagram

正如我们所见,只有在上一次保存完成后才会启动一个保存 HTTP 请求。 以下是 concatMap 运算符如何确保请求始终按顺序发生:

  • concatMap 正在获取每个表单值并将其转换为保存的 HTTP Observable,称为内部 Observable
  • concatMap 然后订阅内部 Observable 并将其输出发送到结果 Observable
    第二个表单值可能比在后端保存前一个表单值更快
  • 如果发生这种情况,新的表单值将不会立即映射到 HTTP 请求
  • 相反, concatMap 将等待先前的 HTTP Observable 完成,然后将新值映射到 HTTP Observable,订阅它并因此触发下一次保存

Observable Merging

将 Observable 串联应用于一系列 HTTP 保存操作似乎是确保保存按预期顺序发生的好方法。

但是在其他情况下,我们希望并行运行,而不需要等待前一个内部 Observable 完成。

为此,我们有合并 Observable 组合策略! 与 concat 不同,Merge 不会在订阅下一个 Observable 之前等待 Observable 完成。

相反,merge 同时订阅每个合并的 Observable,然后随着多个值随着时间的推移到达,它将每个源 Observable 的值输出到组合结果 Observable。

Practical Merge Example

为了明确合并不依赖于完成,让我们合并两个从未完成的 Observables,因为它们是 interval Observables:

const series1$ = interval(1000).pipe(map(val => val*10));

const series2$ = interval(1000).pipe(map(val => val*100));

const result$ = merge(series1$, series2$);

result$.subscribe(console.log);

使用 interval() 创建的 Observable 将每隔一秒发出值 0、1、2 等,并且永远不会完成。

请注意,我们对这些区间 Observable 应用了几个 map 运算符,只是为了更容易在控制台输出中区分它们。

以下是控制台中可见的前几个值:

0
0
10
100
20
200
30
300

Merging and Observable Completion

正如我们所见,合并的源 Observable 的值在发出时立即显示在结果 Observable 中。 如果合并的 Observable 之一完成,merge 将继续发出其他 Observable 随着时间到达的值。

请注意,如果源 Observables 完成,合并仍会以相同的方式工作。

The Merge Marble Diagram

看另一个例子:

正如我们所见,合并的源 Observables 的值立即显示在输出中。 直到所有合并的 Observable 完成后,结果 Observable 才会完成。

现在我们了解了合并策略,让我们看看它如何在高阶 Observable 映射的上下文中使用。

回到我们之前的表单草稿保存示例,很明显在这种情况下我们需要 concatMap 而不是 mergeMap,因为我们不希望保存并行发生。

让我们看看如果我们不小心选择了 mergeMap 会发生什么:


this.form.valueChanges
    .pipe(
        mergeMap(formValue =>
                 this.http.put(`/api/course/${courseId}`,
                               formValue))
    )
    .subscribe(
       saveResult =>  ... handle successful save ...,
        err => ... handle save error ...
    );

现在假设用户与表单交互并开始相当快地输入数据。 在这种情况下,我们现在会在网络日志中看到多个并行运行的保存请求:

正如我们所看到的,请求是并行发生的,在这种情况下是一个错误! 在高负载下,这些请求可能会被乱序处理。

Observable Switching

现在我们来谈谈另一个 Observable 组合策略:切换。 切换的概念更接近于合并而不是串联,因为我们不等待任何 Observable 终止。

但是在切换时,与合并不同,如果一个新的 Observable 开始发出值,我们将在订阅新的 Observable 之前取消订阅之前的 Observable。

Observable 切换就是为了确保未使用的 Observables 的取消订阅逻辑被触发,从而可以释放资源!

Switch Marble Diagram

注意对角线,这些不是偶然的! 在切换策略的情况下,在图中表示高阶 Observable 很重要,它是图像的顶行。

这个高阶 Observable 发出的值本身就是 Observable。

对角线从高阶 Observable 顶线分叉的那一刻,是 Observable 值被 switch 发出和订阅的那一刻。

Breaking down the switch Marble Diagram

这是这张图中发生的事情:

  • 高阶 Observable 发出它的第一个内部 Observable (a-b-c-d),它被订阅(通过 switch 策略实现)
  • 第一个内部 Observable (a-b-c-d) 发出值 a 和 b,它们立即反映在输出中
  • 但随后第二个内部 Observable (e-f-g) 被发射,这触发了第一个内部 Observable (a-b-c-d) 的取消订阅,这是切换的关键部分
  • 然后第二个内部 Observable (e-f-g) 开始发出新值,这些值反映在输出中
  • 但请注意,第一个内部 Observable (a-b-c-d) 同时仍在发出新值 c 和 d
  • 然而,这些后来的值没有反映在输出中,那是因为我们同时取消了第一个内部 Observable (a-b-c-d) 的订阅

我们现在可以理解为什么必须以这种不寻常的方式绘制图表,用对角线:这是因为我们需要在每个内部 Observable 被订阅(或取消订阅)时直观地表示,这发生在对角线从源高阶 Observable。

The RxJs switchMap Operator

然后让我们采用切换策略并将其应用于高阶映射。 假设我们有一个普通的输入流,它发出值 1、3 和 5。

然后我们将每个值映射到一个 Observable,就像我们在 concatMap 和 mergeMap 的情况下所做的那样,并获得一个更高阶的 Observable。

如果我们现在在发出的内部 Observable 之间切换,而不是连接或合并它们,我们最终会得到 switchMap 运算符:

Breaking down the switchMap Marble Diagram

这是该运算符的工作原理:

  • 源 observable 发出值 1、3 和 5
  • 然后通过应用映射函数将这些值转换为 Observable
  • 映射的内部 Observable 被 switchMap 订阅
  • 当内部 Observables 发出一个值时,该值会立即反映在输出中
  • 但是如果在前一个 Observable 有机会完成之前发出了像 5 这样的新值,则前一个内部 Observable (30-30-30) 将被取消订阅,并且它的值将不再反映在输出中
  • 注意上图中红色的 30-30-30 内部 Observable:最后 30 个值没有发出,因为 30-30-30 内部 Observable 被取消订阅

如我们所见,Observable 切换就是确保我们从未使用的 Observable 触发取消订阅逻辑。 现在让我们看看 switchMap 的运行情况!

Search TypeAhead - switchMap Operator Example

switchMap 的一个非常常见的用例是搜索 Typeahead。 首先让我们定义源 Observable,其值本身将触发搜索请求。

这个源 Observable 将发出值,这些值是用户在输入中键入的搜索文本:

const searchText$: Observable<string> =
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith('')
    )
    .subscribe(console.log);

此源 Observable 链接到用户键入其搜索的输入文本字段。 当用户输入单词“Hello World”作为搜索时,这些是 searchText$ 发出的值:

Debouncing and removing duplicates from a Typeahead

请注意重复值,要么是由于使用两个单词之间的空格,要么是使用 Shift 键将字母 H 和 W 大写。

为了避免将所有这些值作为单独的搜索请求发送到后端,让我们使用 debounceTime 运算符等待用户输入稳定:

const searchText$: Observable<string> =
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith(''),
        debounceTime(400)
    )
    .subscribe(console.log);

使用此运算符,如果用户以正常速度键入,则 searchText$ 的输出中现在只有一个值:Hello World

这已经比我们之前的要好得多,现在只有在稳定至少 400 毫秒时才会发出值!

但是如果用户在考虑搜索时输入缓慢,以至于两个值之间需要超过 400 毫秒,那么搜索流可能如下所示:

此外,用户可以键入一个值,按退格键并再次键入,这可能会导致重复的搜索值。 我们可以通过添加 distinctUntilChanged 操作符来防止重复搜索的发生。

Cancelling obsolete searches in a Typeahead

但更重要的是,我们需要一种方法来取消以前的搜索,因为新的搜索开始了。

我们在这里要做的是将每个搜索字符串转换为后端搜索请求并订阅它,并在两个连续的搜索请求之间应用切换策略,如果触发新的搜索,则取消之前的搜索。

这正是 switchMap 运算符将要做的! 这是使用它的 Typeahead 逻辑的最终实现:

const searchText$: Observable<string> =
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith(''),
        debounceTime(400),
        distinctUntilChanged()
    );

const lessons$: Observable<Lesson[]> = searchText$
    .pipe(
        switchMap(search => this.loadLessons(search))
    )
    .subscribe();

function loadLessons(search:string): Observable<Lesson[]> {

    const params = new HttpParams().set('search', search);

    return this.http.get(`/api/lessons/${coursesId}`, {params});
}

switchMap Demo with a Typeahead

现在让我们看看 switchMap 操作符的作用! 如果用户在搜索栏上输入,然后犹豫并输入其他内容,我们通常会在网络日志中看到以下内容:

正如我们所看到的,之前的一些搜索在进行时已被取消,这很棒,因为这将释放可用于其他事情的服务器资源。

The Exhaust Strategy

switchMap 操作符是预输入场景的理想选择,但在其他情况下,我们想要做的是忽略源 Observable 中的新值,直到前一个值被完全处理。

例如,假设我们正在触发后端保存请求以响应单击保存按钮。 我们可能首先尝试使用 concatMap 运算符来实现这一点,以确保保存操作按顺序发生:

fromEvent(this.saveButton.nativeElement, 'click')
    .pipe(
        concatMap(() => this.saveCourse(this.form.value))
    )
    .subscribe();

这确保保存按顺序完成,但是如果用户多次单击保存按钮会发生什么? 以下是我们将在网络日志中看到的内容:

正如我们所见,每次点击都会触发自己的保存:如果我们点击 20 次,我们会得到 20 次保存! 在这种情况下,我们想要的不仅仅是确保按顺序进行保存。

我们还希望能够忽略点击,但前提是保存已经在进行中。 排气 Observable 组合策略将允许我们做到这一点。

Exhaust Marble Diagram

就像以前一样,我们在第一行有一个更高阶的 Observable,它的值本身就是 Observable,从第一行分叉出来。这是这张图中发生的事情:

  • 就像 switch 的情况一样,exhaust 订阅第一个内部 Observable (a-b-c)
    像往常一样,值 a、b 和 c 会立即反映在输出中
  • 然后发出第二个内部 Observable (d-e-f),而第一个 Observable (a-b-c) 仍在进行中
  • 第二个 Observable 被排放策略丢弃,并且不会被订阅(这是排放的关键部分)
    只有在第一个 Observable (a-b-c) 完成后,排气策略才会订阅新的 Observable
  • 当第三个 Observable (g-h-i) 发出时,第一个 Observable (a-b-c) 已经完成,所以第三个 Observable 不会被丢弃,会被订阅
  • 然后,第三个 Observable 的值 g-h-i 将显示在结果 Observable 的输出中,与输出中不存在的值 d-e-f 不同

就像 concat、merge 和 switch 的情况一样,我们现在可以在高阶映射的上下文中应用 exhaust 策略。

The RxJs exhaustMap Operator

现在让我们看看exhaustMap 操作符的弹珠图。 让我们记住,与上图的第一行不同,源 Observable 1-3-5 发出的值不是 Observable。

相反,这些值可以是例如鼠标点击:

所以这是在排放地图图的情况下发生的事情:

  • 发出值 1,并创建内部 Observable 10-10-10
  • Observable 10-10-10 发出所有值并在源 Observable 中发出值 3 之前完成,因此所有 10-10-10 值在输出中发出
  • 在输入中发出一个新值 3,触发一个新的 30-30-30 内部 Observable
  • 但是现在,虽然 30-30-30 仍在运行,但我们在源 Observable 中得到了一个新值 5
  • 这个值 5 被排气策略丢弃,这意味着从未创建 50-50-50 Observable,因此 50-50-50 值从未出现在输出中

A Practical Example for exhaustMap

现在让我们将这个新的exhaustMap Operator 应用到我们的保存按钮场景中:

fromEvent(this.saveButton.nativeElement, 'click')
    .pipe(
        exhaustMap(() => this.saveCourse(this.form.value))
    )
    .subscribe();

如果我们现在点击保存,假设连续 5 次,我们将获得以下网络日志:

正如我们所看到的,我们在保存请求仍在进行时所做的点击被忽略了,正如预期的那样!

请注意,如果我们连续点击例如 20 次,最终正在进行的保存请求将完成,然后第二个保存请求将开始。

How to choose the right mapping Operator?

concatMap、mergeMap、switchMap 和exhaustMap 的行为相似,因为它们都是高阶映射运算符。

但它在许多微妙的方面也如此不同,以至于实际上没有一个运算符可以安全地指向默认值。

相反,我们可以简单地根据用例选择合适的运算符:

  • 如果我们需要在等待完成的同时按顺序做事情,那么 concatMap 是正确的选择
  • 对于并行处理,mergeMap 是最好的选择
  • 如果我们需要取消逻辑,switchMap 是要走的路
  • 为了在当前的 Observables 仍在进行时忽略新的 Observables,exhaustMap 就是这样做的

总结

正如我们所见,RxJ 的高阶映射运算符对于在响应式编程中执行一些非常常见的操作(例如网络调用)至关重要。

为了真正理解这些映射操作符及其名称,我们首先需要重点了解底层的Observable组合策略concat、merge、switch和exhaust。

我们还需要意识到有一个更高阶的映射操作正在发生,其中值被转换成分离的 Observables,并且这些 Observables 被映射运算符本身以隐藏的方式订阅。

选择正确的算子就是选择正确的内部 Observable 组合策略。 选择错误的运算符通常不会导致程序立即损坏,但随着时间的推移可能会导致一些难以解决的问题。

更多Jerry的原创文章,尽在:"汪子熙":

03-05 23:34