我如何构造一个“保持前进”的可观察对象
独立于任何订户(即refCount等)
问题)并为最新订阅者提供最新价值?

这是我尝试过的:

// Approach 1
const myObservable$ = Rx.Observable.timer(0, 1000).publish();
myObservable.connect();
myObservable$.subscribe(x => console.log(x));
setTimeOut(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);

// 0
// 1
// 2
// 3
// 4
// late 4
// 4
// late 5
// ...


方法1的问题在于,晚期订户在t = 3.5s
没有得到“当前值” 3。
我想要的是

// 0
// 1
// 2
// 3
// late 3
// 4
// late 4
// ...


另一种方法使用publishValue

// Approach 2
const myObservable$ = Rx.Observable.timer(0, 1000).publishValue();
myObservable.connect();
myObservable$.subscribe(x => console.log(x));
setTimeOut(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);

// undefined
// 0
// 1
// 2
// 3
// late 3
// 4
// late 4
// ...


在方法2中,迟到的订户在t = 3.5s时获得“正确”值。
这种方法的问题是我们需要提供一个初始
我们可能并不总是拥有的价值。

// Approach 3
const myObservable$ = Rx.Observable.timer(0, 1000).replay(1);
myObservable.connect();
myObservable$.subscribe(x => console.log(x));
setTimeOut(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);

// 0
// 1
// 2
// 3
// late 0
// late 1
// late 2
// late 3
// 4
// late 4
// ...


在这一点上我迷路了。我的印象是.replay(1)
也许可以解决我的问题,但不知何故,它确实重播了多个事件。

有任何想法吗?

最佳答案

方法3是您问题的正确答案。但是,您正在使用interface incorrectly

  /**
   *
   * @example
   * var res = source.replay(null, 3);
   * var res = source.replay(null, 3, 500);
   * var res = source.replay(null, 3, 500, scheduler);
   * var res = source.replay(function (x) { return x.take(6).repeat(); }, 3, 500, scheduler);
   *
   * @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all the notifications of the source subject to the specified replay buffer trimming policy.
   * @param bufferSize [Optional] Maximum element count of the replay buffer.
   * @param windowSize [Optional] Maximum time length of the replay buffer.
   * @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
   * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
   */
Rx.Observable.prototype.replay([selector], [bufferSize], [window], [scheduler])


您将要使用第一个重载source.replay(null, 3),因此您的代码应为:

const myObservable$ = Rx.Observable.timer(0, 1000).replay(null, 1);
myObservable$.connect();
myObservable$.subscribe(x => console.log(x));
setTimeout(function () {
  myObservable$.subscribe(x => console.log("late", x));
}, 3500);

09-25 18:38