问题描述
我试图了解流是如何通过RXjs中的管道传输的。
我知道这不应该是一个问题,因为这是异步流的整个想法 - 但仍然有一些东西我希望了解。
I'm trying to understand how the stream is transmitted through the pipe in RXjs.
I know that this should not be a concern because that's the whole idea with async streams - but still there's something I want to understand.
查看此代码:
var source = Rx.Observable
.range(1, 3)
.flatMapLatest(function (x) { //`switch` these days...
return Rx.Observable.range(x*100, 2);
});
source.subscribe(value => console.log('I got a value ', value))
结果:
I got a value 100
I got a value 200
I got a value 300
I got a value 301
我相信(IIUC)图表是这样的:(通知标记为101,201,取消订阅)
I believe (IIUC) that the diagram is something like this : (notice striked 101,201 which are unsubscribed)
----1---------2------------------3------------------------------|
░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------
以下是问题:
问题:
是否始终保证2将在(101)之前到达?那个3是在(201)之前到达的吗?
Is it always guaranteed that 2 will arrive before the (101) ? same as that 3 is arriving before (201) ?
我的意思是 - 如果我不想看一个时间线,那么下面的图表是完全合法的发生:
I mean - if I'm not suppose to look at a time line so it is perfectly legal for the following diagram to occur :
----1---------------2---------------3------------------------------|
░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------101------200---201-----300------301-------------
其中 2
稍微延迟到达,其中101是已经发出
Where 2
arrived with a slight delay where 101 was already emitted
我在这里缺少什么?管道如何工作?
What am I missing here? How does the pipe work here ?
推荐答案
对于具有此特定RxJS版本的特定Observable链,排放的顺序始终如一是相同的。
For this particular Observable chain with this particular RxJS version the order of emissions is going to always be the same.
如前所述,在RxJS 4中,它使用 currentThread
调度程序,如下所示:。
所有调度程序(立即$ c $除外) c>来自RxJS 4)所以顺序总是相同的。
As already mentioned, in RxJS 4 it uses the currentThread
scheduler as you can see here: https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39.
All schedulers (except the immediate
from RxJS 4) are internally using some type of a queue so the order is always the same.
事件的顺序与你在图中显示的非常相似(.. 。或者至少我认为是):
The order of events is very similar to what you showed in the diagram (... or at least I think it is):
-
1
被安排和发出因为它是队列中的唯一动作。 -
100
已安排。此时,调度程序队列中没有其他操作,因为尚未安排2
。RangeObservable
后递归调度另一个发射。这意味着100
安排在2
之前。 - 。
-
100
已发出,101
已安排 -
2
已发出,101
已被处置。 - ...等等
1
is scheduled and emitted because it's the only action in the queue.100
is scheduled. At this point there are no more action in the Scheduler's queue because2
hasn't been scheduled yet. TheRangeObservable
schedules another emission recursively after it callsonNext()
. This means that100
is scheduled before2
.2
is scheduled.100
is emitted,101
is scheduled2
is emitted,101
is disposed.- ... and so on
请注意,此行为在RxJS 4和RxJS 5中有所不同。
Note that this behavior is different in RxJS 4 and RxJS 5.
在RxJS 5中,大多数Observable和运算符默认情况下不使用任何Scheduler(一个明显的例外是Observables / operator需要处理延迟)。所以在并立即开始在循环中发布值。
In RxJS 5 most Observables and operators by default don't use any Scheduler (an obvious exception are Observables/operator that need to work with delays). So in RxJS 5 the RangeObservable
won't schedule anything and start emitting values right away in a loop.
RxJS 5中的相同示例将产生不同的结果结果:
The same example in RxJS 5 will produce different result:
const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2);
});
source.subscribe(value => console.log('I got a value ', value));
这将打印以下内容:
I got a value 100
I got a value 101
I got a value 200
I got a value 201
I got a value 300
I got a value 301
但是,如果你添加例如延迟(0)
。常识表明这不应该做任何事情:
However, this will change significantly if you add for example delay(0)
. The common sense suggests that this shouldn't do anything:
const source = Observable
.range(1, 3)
.switchMap(function (x) {
return Observable.range(x * 100, 2).delay(0);
});
source.subscribe(value => console.log('I got a value ', value));
现在只安排内部 RangeObservable
并且重复处理几次,这使得它只发出最后一个 RangeObservable
的值:
Now only the inner RangeObservable
is scheduled and disposed all over again several times which makes it emit only values from the very the last RangeObservable
:
I got a value 300
I got a value 301
这篇关于如何在RXJS中传输异步流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!