我是reactivex/rxjs的新手,我想知道我的用例是否可以顺利地与rxjs结合使用,最好是与内置操作符结合使用。以下是我想要实现的目标:
我有一个与rest api通信的angular2应用程序。应用程序的不同部分需要在不同的时间访问相同的信息。为了避免一次又一次地发出相同的请求而影响服务器,我想添加客户端缓存。缓存应该发生在一个服务层中,在这个服务层中,网络调用实际上是在这里进行的。然后,这个服务层只分发Observables。缓存必须对应用程序的其余部分透明:它应该只知道Observables,而不是缓存。
因此,在最初的60秒内,rest api中的特定信息应该只被检索一次,比方说,60秒,即使有十几个组件在这60秒内从服务请求此信息。每个认购人必须在认购时获得可观察到的(单个)最后价值。
目前,我正是通过这样一种方法做到了这一点:

public getInformation(): Observable<Information> {
  if (!this.information) {
    this.information = this.restService.get('/information/')
      .cache(1, 60000);
  }
  return this.information;
}

在本例中,restService.get(...)执行实际的网络调用并返回一个Observable,这与angular的http服务非常相似。
这种方法的问题是刷新缓存:虽然它确保网络调用只执行一次,并且缓存的值在60秒后不再被推送到新的订阅服务器,但它不会在缓存过期后重新执行初始请求。因此,在60秒缓存之后发生的订阅将不会从Observable中获得任何值。
如果缓存超时后发生新订阅,是否可以重新执行初始请求,并再次将新值重新缓存60秒?
另一个好处是:如果现有订阅(例如,那些发起了第一个网络调用的订阅)能够获得更新后的值,而更新后的订阅已经启动了对该值的获取,那么它将更酷,这样一旦信息被刷新,它将立即通过整个可观察的感知应用程序传递。

最佳答案

我想出了一个解决办法来实现我想要的。它可能与reactivex命名法和最佳实践背道而驰,但从技术上讲,它完全符合我的要求。也就是说,如果有人仍然找到一种方法来实现相同的只是内置的运营商,我会很高兴接受一个更好的答案。
因此,基本上,由于我需要一种在订阅时重新触发网络调用的方法(没有轮询,没有计时器),所以我研究了ReplaySubject是如何实现的,甚至把它用作我的基类。然后我创建了一个基于回调的类RefreshingReplaySubject(命名改进欢迎!)。在这里:

export class RefreshingReplaySubject<T> extends ReplaySubject<T> {

  private providerCallback: () => Observable<T>;
  private lastProviderTrigger: number;
  private windowTime;

  constructor(providerCallback: () => Observable<T>, windowTime?: number) {
    // Cache exactly 1 item forever in the ReplaySubject
    super(1);
    this.windowTime = windowTime || 60000;
    this.lastProviderTrigger = 0;
    this.providerCallback = providerCallback;
  }

  protected _subscribe(subscriber: Subscriber<T>): Subscription {
    // Hook into the subscribe method to trigger refreshing
    this._triggerProviderIfRequired();
    return super._subscribe(subscriber);
  }

  protected _triggerProviderIfRequired() {
    let now = this._getNow();
    if ((now - this.lastProviderTrigger) > this.windowTime) {
      // Data considered stale, provider triggering required...
      this.lastProviderTrigger = now;
      this.providerCallback().first().subscribe((t: T) => this.next(t));
    }
  }
}

下面是最终的用法:
public getInformation(): Observable<Information> {
  if (!this.information) {
    this.information = new RefreshingReplaySubject(
      () => this.restService.get('/information/'),
      60000
    );
  }
  return this.information;
}

09-10 12:15
查看更多