我有一个可观测的源source1发出值,如果它在超过2秒的时间内没有发出任何东西,我想切换到后备源source2。如果source1再次发射,我想从中发射。等等,无限期的。
到目前为止,我有

import { timeout, catchError, takeUntil, concat } from 'rxjs/operators';

declare const source1: Observable;
declare const source2: Observable;

source1.pipe(
    timeout(2000),
    catchError(() => {
      return source2.pipe(
        takeUntil(source1)
      );
    }),
    concat(source1)
).subscribe(val => console.log(val));

这几乎行得通。如果source1在2秒后不发射,它将从source2发射,直到source1再次发射,然后切换到source1。但有两个主要缺陷:
source1再次发射时,第一个发射值被takeUntil捕获(source1是一个热观测值),并且不会在concat(source1)中。
如果source1停止发射第二次,我想有同样的行为。在我的实现中,它只工作一次。
知道我怎么解决这个问题吗?

最佳答案

我猜你可以通过共享source1,然后使用repeat重新提交到同一个链(我没有测试它):

const shared1 = source1.pipe(share());

source1.pipe(
  timeout(2000),
  catchError(() => merge(source1, source2).pipe(
    takeUntil(source1),
  )),
  repeat(),
).subscribe(val => console.log(val));

关于javascript - 如果source1超时,并且当source1退回时,RxJS从source2发出从source1发出,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54687527/

10-09 09:46