将每个源值投影到同一个 Observable,该 Observable 在输出 Observable 中使用 switchMap 多次展平。

输入一个 Observable,输出一个 function Operator. 实际是一个函数,每次在源 Observable 上发出值时,该函数都会返回一个 新的 Observable.

该函数从给定的 innerObservable 发出项目,并且仅从最近投影的内部 Observable 中获取值。

看个例子:

import { EMPTY, range } from 'rxjs';
import { first, take, tap } from 'rxjs/operators';

import { fromEvent, interval } from 'rxjs';
import { switchMapTo } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');

const test = event => console.log('Jerry: ', event);
const result = clicks.pipe(
  tap(test),

  switchMapTo(interval(1000))
);
result.subscribe(x => console.log(x));

输出:

每次点击之后,click$ 抛出的 PointerEvent,被 switchMapTo 返回的 Function Operator 丢弃了。最后用户订阅 result 函数里,打印的值,是 switchMapTo 输入的 interval(1000) Observable 发射的值,而不再是 clicks$ 抛出的 PointerEvent.

再看另一个在网页显示倒计时数字的例子。

import './style.css';

import { interval, fromEvent } from 'rxjs';
import {
  switchMapTo,
  scan,
  startWith,
  takeWhile,
  finalize
} from 'rxjs/operators';

const COUNTDOWN_TIME = 10;

// reference
const countdownElem = document.getElementById('countdown');

// streams
const click$ = fromEvent(document, 'click');
const countdown$ = interval(2000).pipe(
  scan((acc, _) => --acc, COUNTDOWN_TIME),
  startWith(COUNTDOWN_TIME)
);

click$
  .pipe(
    switchMapTo(countdown$),
    takeWhile(val => val >= -10),
    finalize(() => (countdownElem.innerHTML = "We're done here!"))
  )
  .subscribe((val: any) => (countdownElem.innerHTML = val));

初始整数是10,每隔2秒钟减一,减到 -10 时停止。

思路:触发计数器开始递减的操作是点击屏幕,因此需要使用 fromEvent 来构造 Observable :click$

每隔两秒钟执行某项操作,因此需要用 interval 构造第二个 Observable.

一旦计数器启动之后,每隔两秒钟需要执行递减操作,因此需要用 switchMapTo,将 click$ 映射成 interval Observable.

之后的值传递,就和 click$ 再无任何关联了。

因为是递减操作,暗示这是一个 stateful 场景,故选用 scan 操作符维护内部状态。

更多Jerry的原创文章,尽在:"汪子熙":

03-05 23:34