我根本不了解mergeMap的目的。我听说有两种解释:

  • 就像.NET LINQ中的SelectAll()-不。
  • 这是RxJS mergemap的组合-不,(或者我不能复制它)。

  • 考虑following code:
        var obs1 = new Rx.Observable.interval(1000);
        var obs2 = new Rx.Observable.interval(1000);
    
        //Just a merge and a map, works fine
        obs1.merge(obs2).map(x=> x+'a').subscribe(
          next => console.log(next)
        )
    
        //Who know what - seems to do the same thing as a plain map on 1 observable
        obs1.mergeMap(val => Rx.Observable.of(val + `B`))
            .subscribe(
              next => console.log(next)
            )
    
    标记为“谁知道什么”的最后一块,只不过是obs1上的 map 而已-有什么意义?mergeMap实际做什么?有效用例的例子是什么? (最好带有一些代码)
    完全没有帮助的文章(上面的mergeMap代码来自其中之一):12

    最佳答案

    tl; dr; mergeMapmap更强大。了解mergeMap是访问Rx的全部功能的必要条件。

    相似点

  • mergeMapmap都作用于单个流(与zipcombineLatest相比)
  • mergeMapmap都可以转换流元素(与filterdelay相比)

  • 差异
    map
  • 无法更改源流的大小(假设:map本身不是throw);对于源中的每个元素,仅发出一个mapped元素; map不能忽略元素(例如filter);
  • 在默认调度程序的情况下,转换是同步发生的;是100%清晰的:源流可以异步传递其元素,但是每个下一个元素立即被mapped并进一步重新发射; map无法像delay一样及时移动元素
  • 对返回值没有限制
  • id:x => x

  • mergeMap
  • 可以更改源流的大小;对于每个元素,可能会创建或发出任意数量(0、1或许多)的新元素
  • 它提供了对异步性的完全控制-无论何时创建/发出新元素,以及应同时处理源流中的多少个元素;例如,假设源流发出了10个元素,但是maxConcurrency设置为2,那么将立即处理两个第一个元素,并对其余8个元素进行缓冲;一旦处理了complete d中的一个,就将处理源流中的下一个元素,依此类推-有点棘手,但请看下面的示例
  • 所有其他运算符都可以仅使用mergeMapObservable构造函数来实现
  • 可用于递归异步操作
  • 返回值必须是Observable类型(或者Rx必须知道如何从中创建可观察对象-例如promise,array)
  • id:x => Rx.Observable.of(x)

  • 数组类比
    let array = [1,2,3]
    fn             map                    mergeMap
    x => x*x       [1,4,9]                error /*expects array as return value*/
    x => [x,x*x]   [[1,1],[2,4],[3,9]]    [1,1,2,4,3,9]
    
    这个类比不能显示完整图片,它基本上与.mergeMap设置为1的maxConcurrency相对应。在这种情况下,元素将按上述顺序排序,但通常情况下不必如此。我们唯一的保证是,新元素的发射将根据它们在基础流中的位置进行排序。例如:[3,1,2,4,9,1][2,3,1,1,9,4]有效,但[1,1,4,2,3,9]无效(因为4是在基础流中的2之后发出的)。
    使用mergeMap的几个示例:

    // implement .map with .mergeMap
    Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
      return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
    }
    
    Rx.Observable.range(1, 3)
      .mapWithMergeMap(x => x * x)
      .subscribe(x => console.log('mapWithMergeMap', x))
    
    // implement .filter with .mergeMap
    Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
      return this.mergeMap(x =>
        filterFn(x) ?
        Rx.Observable.of(x) :
        Rx.Observable.empty()); // return no element
    }
    
    Rx.Observable.range(1, 3)
      .filterWithMergeMap(x => x === 3)
      .subscribe(x => console.log('filterWithMergeMap', x))
    
    // implement .delay with .mergeMap
    Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
      return this.mergeMap(x =>
        Rx.Observable.create(obs => {
          // setTimeout is naive - one should use scheduler instead
          const token = setTimeout(() => {
            obs.next(x);
            obs.complete();
          }, delayMs)
          return () => clearTimeout(token);
        }))
    }
    
    Rx.Observable.range(1, 3)
      .delayWithMergeMap(500)
      .take(2)
      .subscribe(x => console.log('delayWithMergeMap', x))
    
    // recursive count
    const count = (from, to, interval) => {
      if (from > to) return Rx.Observable.empty();
      return Rx.Observable.timer(interval)
        .mergeMap(() =>
          count(from + 1, to, interval)
          .startWith(from))
    }
    
    count(1, 3, 1000).subscribe(x => console.log('count', x))
    
    // just an example of bit different implementation with no returns
    const countMoreRxWay = (from, to, interval) =>
      Rx.Observable.if(
        () => from > to,
        Rx.Observable.empty(),
        Rx.Observable.timer(interval)
        .mergeMap(() => countMoreRxWay(from + 1, to, interval)
          .startWith(from)))
    
    const maxConcurrencyExample = () =>
      Rx.Observable.range(1,7)
        .do(x => console.log('emitted', x))
        .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
        .do(x => console.log('processed', x))
        .subscribe()
    
    setTimeout(maxConcurrencyExample, 3100)
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>

    10-07 13:39