问题描述
我根本不了解 mergeMap
的目的.我听说有两种解释:
I don't understand the purpose of mergeMap
at all. I have heard are two explanations:
- 这就像.NET LINQ中的
SelectAll()
-不. - 这是RxJS
merge
和map
的组合-不,(或者我无法复制).
- It's like
SelectAll()
in .NET LINQ - nope. - It's a combination of RxJS
merge
andmap
- nope (or I can't replicate this).
考虑以下代码:
var obs1 = new Rx.Observable.interval(1000);
var obs2 = new Rx.Observable.interval(1000);
//Just a merge and a map, works fine
obs1.merge(obs2).map(x=> x+'a').subscribe(
next => console.log(next)
)
//Who know what - seems to do the same thing as a plain map on 1 observable
obs1.mergeMap(val => Rx.Observable.of(val + `B`))
.subscribe(
next => console.log(next)
)
标有谁知道什么"的最后一块是除了 obs1
上的地图外,没有什么-意思是什么?
The last piece labelled "Who knows what" does nothing more than a map on obs1
- what's the point?
mergeMap
实际做什么?有效用例的例子是什么?(最好带有一些代码)
What does mergeMap
actually do? What is an example of a valid use case? (Preferably with some code)
完全没有帮助的文章(上面的mergeMap代码来自其中之一): 1 ,2
Articles that didn't help me at all (mergeMap code from above is from one of these): 1, 2
推荐答案
tl; dr; mergeMap
比 map
更强大.了解 mergeMap
是访问Rx的全部功能的必要条件.
tl;dr; mergeMap
is way more powerful than map
. Understanding mergeMap
is the necessary condition to access full power of Rx.
-
mergeMap
和map
都作用于单个流(与zip
,combineLatest
相比)
both
mergeMap
andmap
acts on a single stream (vs.zip
,combineLatest
)
mergeMap
和 map
都可以转换流元素(相对于 filter
, delay
)
both mergeMap
and map
can transform elements of a stream (vs. filter
, delay
)
-
无法更改源流的大小(假设:
map
本身不抛出
);对于源中的每个元素,仅发射一个mapped
元素;map
不能忽略元素(例如filter
);
cannot change size of the source stream (assumption:
map
itself does notthrow
); for each element from source exactly onemapped
element is emitted;map
cannot ignore elements (like for examplefilter
);
在使用默认调度程序的情况下,转换是同步进行的;是100%清晰的:源流可以异步传递其元素,但是每个下一个元素都会立即被 mapped
映射并进一步重新发射; map
不能及时移动元素,例如 delay
in case of the default scheduler the transformation happens synchronously; to be 100% clear: the source stream may deliver its elements asynchronously, but each next element is immediately mapped
and re-emitted further; map
cannot shift elements in time like for example delay
对返回值没有限制
id
: x =>x
-
可以更改源流的大小;对于每个元素,可能会创建或发出任意数量(0、1或许多)的新元素
can change size of the source stream; for each element there might be arbitrary number (0, 1 or many) of new elements created/emitted
它提供了对异步性的完全控制-无论何时创建/发出新元素,以及应同时处理源流中的多少个元素;例如,假设源流发出10个元素,但 maxConcurrency
设置为2,则将立即处理前两个元素,其余8个元素将被缓冲;一旦完成处理后的
中的一个,则将处理源流中的下一个元素,依此类推-有点棘手,但请看下面的示例
it offers full control over asynchronicity - both when new elements are created/emitted and how many elements from the source stream should be processed concurrently; for example assume source stream emitted 10 elements but maxConcurrency
is set to 2 then two first elements will be processed immediately and the rest 8 buffered; once one of the processed complete
d the next element from source stream will be processed and so on - it is bit tricky, but take a look at the example below
所有其他运算符都可以使用 mergeMap
和 Observable
构造函数
all other operators can be implemented with just mergeMap
and Observable
constructor
可用于递归异步操作
返回值必须是Observable类型(或者Rx必须知道如何从中创建可观察对象-例如promise,array)
return values has to be of Observable type (or Rx has to know how to create observable out of it - e.g. promise, array)
id
: x =>Rx.Observable.of(x)
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
类比不能显示完整图片,它基本上与 .mergeMap
对应,且 maxConcurrency
设置为1.在这种情况下,元素将按上述顺序排序,但在一般情况下不一定是这样.我们唯一的保证是,新元素的发射将根据它们在基础流中的位置进行排序.例如: [3,1,2,4,9,1]
和 [2,3,1,1,9,4]
是有效的,但> [1,1,4,2,3,9]
不是(因为在底层流中的 2
之后发出了 4
).
The analogy does not show full picture and it basically corresponds to .mergeMap
with maxConcurrency
set to 1. In such a case elements will be ordered as above, but in general case it does not have to be so. The only guarantee we have is that emission of new elements will be order by their position in the underlying stream. For example: [3,1,2,4,9,1]
and [2,3,1,1,9,4]
are valid, but [1,1,4,2,3,9]
is not (since 4
was emitted after 2
in the underlying stream).
// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))
// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))
// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))
// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log('count', x))
// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>
这篇关于mergeMap运算符的用例是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!