在以下代码中,我创建了一个简单的可观察对象,该对象产生一个值,然后完成。然后,我分享那个可观察的重播最后一项并订阅3次。紧随其后的第一个,产生值之前的第二个,产生值之后第三次,并且可观察的对象已经完成。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
  console.log('Creating observable');
  i++;
  setTimeout(() => {
     obs.onNext(i);
     obs.onCompleted();
  }, 2000);
}).shareReplay(1);

obs$.subscribe(
  data => console.log(`s1: data = ${data}`),
  () => {},
  () => console.log('finish s1')
);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s2: data = ${data}`),
    () => {},
    () => console.log('finish s2')

  );
}, 1000);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s3: data = ${data}`),
    () => {},
    () => console.log('finish s3')

  );
}, 6000);


您可以execute this on jsbin

结果是下面的大理石图

Actual
s1: -----1$
s2:   \--1$
s3:           \1$


但是我希望

Expected
s1: -----1$
s2:   \--1$
s3:           \----2$


我可以理解为什么有人想要第一个行为,但是我的理由是,与本示例不同,在我返回数字的同时,我可能返回的对象是易于退订行为的对象,例如数据库连接。如果上面的大理石图表示数据库连接,那么在dispose方法中我调用db.close()的情况下,在第三个订阅上,我会有一个异常,因为我正在接收释放的数据库处理程序作为值。 (因为当第二个订阅完成refCount = 0并处理了源时)。

这个例子还有另一个奇怪的事情是,即使它解决了
第一个值并在其后完成,它两次订阅源(如重复的“ Creating observable”所示)

我知道this github issue谈论这个问题,但我缺少的是:

如何实现(在RxJs4和5中)共享的可观察对象,如果源可观察对象尚未完成,则可以重播最后一项;如果源可观察对象尚未完成,则重播最后一个项目(refCount = 0),重新创建可观察对象。

在RxJs5中,我认为共享方法可以解决问题的重新连接部分,但不能解决共享部分。

在RxJs4中,我一无所知

如果可能,我想使用现有的运算符或主题解决此问题。我的直觉告诉我,我必须使用这种逻辑来创建一个不同的Subject,但是我还没有到位。

最佳答案

关于shareReplay的一点:

shareReplay在返回的Observable的剩余生命周期中保留相同的基础ReplaySubject实例。

ReplaySubject完成后,您将无法再输入任何值,但仍会重播。所以...


您第一次订阅可观察对象,超时开始。这会将i0递增到1
您第二次订阅可观察的消息,并且超时已经过去了。
将触发超时回调并发出onNext(i),然后发出onCompleted()
onCompleted()信号完成了ReplaySubject内部的shareReplay,这意味着从现在开始,共享的可观察对象将简单地重播其具有的值(即1)并完成。


一般而言,关于共享可观察对象的一点:

另一个独立的问题是,由于您共享了可观察对象,因此只能一次调用订阅者函数。这意味着i只会被增加一次。因此,即使您没有onCompleted并杀死底层的ReplaySubject,您最终也不会将其递增到2

这不是RxJS 5

一种快速的判断方法是onNext vs next。您当前在示例中使用的是RxJS 4,但是您已经用RxJS 5对其进行了标记,并且发现RxJS 5中存在问题。RxJS5是beta版,并且是一个完全重写RxJS 4的新版本。 API更改主要是为了匹配es-observable proposal which is currently at stage 1

更新的例子

I've updated your example to give you your expected results

基本上,您希望对前两个调用使用可观察到的共享版本,而对第三个调用使用原始可观察到的共享版本。

let i = 0;
let obs$ = Rx.Observable.create(obs => {
  console.log('Creating observable');
  i++;
  setTimeout(() => {
     obs.onNext(i);
     obs.onCompleted();
  }, 2000);
})


let shared$ = obs$.shareReplay(1);

shared$.subscribe(
  data => console.log(`s1: data = ${data}`),
  () => {},
  () => console.log('finish s1')
);

setTimeout( () => {
  shared$.subscribe(
    data => console.log(`s2: data = ${data}`),
    () => {},
    () => console.log('finish s2')

  );
}, 1000);

setTimeout( () => {
  obs$.subscribe(
    data => console.log(`s3: data = ${data}`),
    () => {},
    () => console.log('finish s3')

  );
}, 6000);


无关

另外,提示:请确保为调用clearTimeout的自定义可观察对象返回取消语义。

10-06 10:01