如何等待一个流(例如,StreamA)返回非空值,然后调用StreamB订阅函数。我对StreamA的值并不特别感兴趣。反过来,我试图获取StreamB的值,该值可能在StreamA返回非空值之前已更新,并且可能没有任何新事件。

我通过查看以下内容尝试了一下,但很可惜:RxJS: How can I do an "if" with Observables?,但是很遗憾,它无法正常工作。这是因为,没有导出的类pausable,rxjs v 5.0.0-beta.6。

根据答案,这是我想出的程度。

导出类AuthService {
      userModel:FirebaseListObservable = this.af.database.list(/users);
      构造函数(私有af:AngularFire){

    var user = this.currentAuthor();
    var userStream = user;

    this.af.auth.flatMap((d) => { console.log(d);return this.userModel.publishReplay(1); });

    this.userModel
      .subscribe((data) => {
        var flag = false;
        data.forEach((item) => {
          if (item.$key && item.$key === user.uid) {
            flag = true;
            return;
          }
        });

        if (flag) {
          console.log('hello');
        } else {
          this.userModel.push({
              firstName: user.auth.displayName.substr(0, user.auth.displayName.lastIndexOf(' ')),
              lastName: user.auth.displayName.substr(user.auth.displayName.lastIndexOf(' '), user.auth.displayName.length),
              displayPic: user.auth.photoURL,
              provider: user.provider,
              uid: user.uid
            }
          );
        }
      })
  }
  public currentAuthor():FirebaseAuthState  {
    return this.af.auth.getAuth();
  }


希望我能说清楚。甚至我现在也感到困惑。 :p。

我是rxjs和反应式编程的新手。并且,任何帮助将不胜感激。

顺便说一句,感谢您的光临。 :)

最佳答案

我想plausible是指pausable吗?我不确定您到底要在此实现什么(控制流程?)。但是,如果要在streamA产生值之后再使用streamB值,则可以使用flatMap

streamA.flatMapLatest(function (_){return streamB})

这样,在streamA发出的任何时间,您就可以得到streamB在此之后发出的值。

如果需要包括在此之前发出的最后一个B的值,则可以使用streamBB = streamB.publishReplay(1)

streamA.flatMapLatest(function (_){return streamBB})

还没有测试过,所以如果可以的话请及时更新。

09-25 18:02