我根本不了解mergeMap
的目的。我听说有两种解释:
SelectAll()
-不。 merge
和map
的组合-不,(或者我不能复制它)。 考虑following code:
var obs1 = new Rx.Observable.interval(1000);
var obs2 = new Rx.Observable.interval(1000);
//Just a merge and a map, works fine
obs1.merge(obs2).map(x=> x+'a').subscribe(
next => console.log(next)
)
//Who know what - seems to do the same thing as a plain map on 1 observable
obs1.mergeMap(val => Rx.Observable.of(val + `B`))
.subscribe(
next => console.log(next)
)
标记为“谁知道什么”的最后一块,只不过是obs1
上的 map 而已-有什么意义?mergeMap
实际做什么?有效用例的例子是什么? (最好带有一些代码)完全没有帮助的文章(上面的mergeMap代码来自其中之一):1,2
最佳答案
tl; dr; mergeMap
比map
更强大。了解mergeMap
是访问Rx的全部功能的必要条件。
相似点
mergeMap
和map
都作用于单个流(与zip
和combineLatest
相比)mergeMap
和map
都可以转换流元素(与filter
和delay
相比)差异
map
map
本身不是throw
);对于源中的每个元素,仅发出一个mapped
元素; map
不能忽略元素(例如filter
);mapped
并进一步重新发射; map
无法像delay
一样及时移动元素id
:x => x
mergeMap
maxConcurrency
设置为2,那么将立即处理两个第一个元素,并对其余8个元素进行缓冲;一旦处理了complete
d中的一个,就将处理源流中的下一个元素,依此类推-有点棘手,但请看下面的示例mergeMap
和Observable
构造函数来实现id
:x => Rx.Observable.of(x)
数组类比
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
这个类比不能显示完整图片,它基本上与.mergeMap
设置为1的maxConcurrency
相对应。在这种情况下,元素将按上述顺序排序,但通常情况下不必如此。我们唯一的保证是,新元素的发射将根据它们在基础流中的位置进行排序。例如:[3,1,2,4,9,1]
和[2,3,1,1,9,4]
有效,但[1,1,4,2,3,9]
无效(因为4
是在基础流中的2
之后发出的)。使用
mergeMap
的几个示例:// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))
// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))
// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))
// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log('count', x))
// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>