在以下代码中,我创建了一个简单的可观察对象,该对象产生一个值,然后完成。然后,我分享那个可观察的重播最后一项并订阅3次。紧随其后的第一个,产生值之前的第二个,产生值之后第三次,并且可观察的对象已经完成。
let i = 0;
let obs$ = Rx.Observable.create(obs => {
console.log('Creating observable');
i++;
setTimeout(() => {
obs.onNext(i);
obs.onCompleted();
}, 2000);
}).shareReplay(1);
obs$.subscribe(
data => console.log(`s1: data = ${data}`),
() => {},
() => console.log('finish s1')
);
setTimeout( () => {
obs$.subscribe(
data => console.log(`s2: data = ${data}`),
() => {},
() => console.log('finish s2')
);
}, 1000);
setTimeout( () => {
obs$.subscribe(
data => console.log(`s3: data = ${data}`),
() => {},
() => console.log('finish s3')
);
}, 6000);
您可以execute this on jsbin
结果是下面的大理石图
Actual
s1: -----1$
s2: \--1$
s3: \1$
但是我希望
Expected
s1: -----1$
s2: \--1$
s3: \----2$
我可以理解为什么有人想要第一个行为,但是我的理由是,与本示例不同,在我返回数字的同时,我可能返回的对象是易于退订行为的对象,例如数据库连接。如果上面的大理石图表示数据库连接,那么在dispose方法中我调用
db.close()
的情况下,在第三个订阅上,我会有一个异常,因为我正在接收释放的数据库处理程序作为值。 (因为当第二个订阅完成refCount = 0并处理了源时)。这个例子还有另一个奇怪的事情是,即使它解决了
第一个值并在其后完成,它两次订阅源(如重复的“ Creating observable”所示)
我知道this github issue谈论这个问题,但我缺少的是:
如何实现(在RxJs4和5中)共享的可观察对象,如果源可观察对象尚未完成,则可以重播最后一项;如果源可观察对象尚未完成,则重播最后一个项目(refCount = 0),重新创建可观察对象。
在RxJs5中,我认为共享方法可以解决问题的重新连接部分,但不能解决共享部分。
在RxJs4中,我一无所知
如果可能,我想使用现有的运算符或主题解决此问题。我的直觉告诉我,我必须使用这种逻辑来创建一个不同的Subject,但是我还没有到位。
最佳答案
关于shareReplay的一点:shareReplay
在返回的Observable的剩余生命周期中保留相同的基础ReplaySubject
实例。ReplaySubject
完成后,您将无法再输入任何值,但仍会重播。所以...
您第一次订阅可观察对象,超时开始。这会将i
从0
递增到1
。
您第二次订阅可观察的消息,并且超时已经过去了。
将触发超时回调并发出onNext(i)
,然后发出onCompleted()
。onCompleted()
信号完成了ReplaySubject
内部的shareReplay
,这意味着从现在开始,共享的可观察对象将简单地重播其具有的值(即1)并完成。
一般而言,关于共享可观察对象的一点:
另一个独立的问题是,由于您共享了可观察对象,因此只能一次调用订阅者函数。这意味着i
只会被增加一次。因此,即使您没有onCompleted
并杀死底层的ReplaySubject
,您最终也不会将其递增到2
。
这不是RxJS 5
一种快速的判断方法是onNext
vs next
。您当前在示例中使用的是RxJS 4,但是您已经用RxJS 5对其进行了标记,并且发现RxJS 5中存在问题。RxJS5是beta版,并且是一个完全重写RxJS 4的新版本。 API更改主要是为了匹配es-observable proposal which is currently at stage 1
更新的例子
I've updated your example to give you your expected results
基本上,您希望对前两个调用使用可观察到的共享版本,而对第三个调用使用原始可观察到的共享版本。
let i = 0;
let obs$ = Rx.Observable.create(obs => {
console.log('Creating observable');
i++;
setTimeout(() => {
obs.onNext(i);
obs.onCompleted();
}, 2000);
})
let shared$ = obs$.shareReplay(1);
shared$.subscribe(
data => console.log(`s1: data = ${data}`),
() => {},
() => console.log('finish s1')
);
setTimeout( () => {
shared$.subscribe(
data => console.log(`s2: data = ${data}`),
() => {},
() => console.log('finish s2')
);
}, 1000);
setTimeout( () => {
obs$.subscribe(
data => console.log(`s3: data = ${data}`),
() => {},
() => console.log('finish s3')
);
}, 6000);
无关
另外,提示:请确保为调用
clearTimeout
的自定义可观察对象返回取消语义。