我将状态保存在一个ReplaySubject中,以重播状态的最后一个副本。从该状态,派生其他ReplaySubjects来保持...很好的派生状态。每个重播主题仅需要保持其最后的计算状态/派生状态。 (我们不使用BehaviorSubjects,因为它们总是提供一个值,但我们只希望从父级可观察对象派生一个值。)如果我们已经生成派生状态,则始终有必要将值重放给新的订户。

我有一个自定义的可观察运算符,可以按照我想要的方式完成此操作,但是感觉并不干净。我觉得应该有一种有效的方法来利用RxJ的运算符(operator)本身来完成此任务。

我尝试了两种最明显的方法,但是每种方法都有一些小问题。问题涉及取消订阅和重新订阅。
打开下面的小提琴,打开控制台,然后单击运行。我将用每个输出描述问题。

https://jsfiddle.net/gfe1nryp/1/

refCount编辑的ReplaySubject的问题

=== RefCounted Observable ===


Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Subscription 2: 3
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8

这很好用,当没有订阅时,中间函数不起作用。但是,一旦我们重新订阅。我们可以看到订阅2在取消订阅之前重播了最后一个状态,然后根据base$状态中的当前值播放了派生状态。这是不理想的。

connect ed ReplaySubject 问题
=== Hot Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Work
Work
Work
Resubscribe
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8

这个问题与refCount ed可观察到的问题不同,在取消订阅之前,没有不必要的最后状态重播。但是,由于observable现在很热,所以需要权衡的是,即使有任何订阅未使用该值,只要有新值进入base$状态,我们总是会工作。

最后,我们有了自定义运算符:
=== Custom Observable ===
Work
Subscription 1: 1
Work
Subscription 1: 2
Work
Subscription 1: 3
Unsubscribe
Resubscribe
Work
Subscription 2: 6
Work
Subscription 2: 7
Work
Subscription 2: 8

啊,两全其美。它不仅不会不必要地重播取消订阅之前的最后一个值,而且在没有订阅的情况下也不会不必要地进行任何工作。
这是通过手动创建RefCountReplaySubject的组合来完成的。我们跟踪每个订户,当它达到0时,我们刷新重播值。它的代码在这里(当然,在小提琴中):
Rx.Observable.prototype.selectiveReplay = function() {
  let subscribers = [];
  let innerSubscription;

  let storage = null;

  return Rx.Observable.create(observer => {
    if (subscribers.length > 0) {
      observer.next(storage);
    }

    subscribers.push(observer);

    if (!innerSubscription) {
      innerSubscription = this.subscribe(val => {
        storage = val;
        subscribers.forEach(subscriber => subscriber.next(val))
      });
    }

    return () => {
      subscribers = subscribers.filter(subscriber => subscriber !== observer);

      if (subscribers.length === 0) {
        storage = null;
        innerSubscription.unsubscribe();
        innerSubscription = null;
      }
    };
  });
};

因此,这个可观察的自定义变量已经可以使用了。 但是,仅使用RxJS运算符可以完成吗?请记住,这样的主题可能有多个相互关联。在示例中,我仅使用一个链接到base$来说明我在最基本的级别上尝试过的两种常用方法的问题。
基本上,如果您只能使用RxJS运算符,并获得与上述=== Custom Observable ===的输出匹配的输出。这就是我想要的。谢谢!

最佳答案

您应该能够将multicast与主题工厂而不是主题一起使用。 cf. https://jsfiddle.net/pto7ngov/1/

(function(){
  console.log('=== RefCounted Observable ===');
  var base$ = new Rx.ReplaySubject(1);

  var listen$ = base$.map(work).multicast(()=> new Rx.ReplaySubject(1)).refCount();

  var subscription1 = listen$.subscribe(x => console.log('Subscription 1: ' + x));

  base$.next(1);
  base$.next(2);
  base$.next(3);

    console.log('Unsubscribe');
  subscription1.unsubscribe();

  base$.next(4);
  base$.next(5);
  base$.next(6);

    console.log('Resubscribe');
  var subscription2 = listen$.subscribe(x => console.log('Subscription 2: ' + x));

  base$.next(7);
  base$.next(8);
})();

multicast operator的重载完全可以满足您的用例。每次multicast运算符返回的可观察对象完成并重新连接时,它都会使用提供的工厂创建一个新的主题。虽然它没有很好的文档记录,但是它基本上是从Rxjs v4复制现有的API。

万一我误解了或者不起作用,请告诉我,

08-15 14:34