假设我希望可观察对象定期发出值,直到另一个可观察对象发出为止。因此,我可以使用timertakeUntil达到目标。
但是然后我想处理每个发出的值,并在某些条件变为真时停止发出(错误)消息。所以我写下一段代码:

const { timer, Subject } = 'rxjs';
const { takeUntil, map } = 'rxjs/operators';

const s = new Subject();
let count = 0;
function processItem(item) {
  count++;
  return count < 3;
}

const value = "MyValue";

timer(0, 1000).pipe(
      takeUntil(s),
      map(() => {
        if (processItem(value)) {
          console.log(`Processing done`);
          return value;
        } else {
          s.next(true);
          s.complete();
          console.error(`Processing error`);
          throw new Error(`Stop pipe`);
        }
      }),
    )


Playground

但是我没有完成错误,而是完成了我的Observable。
仅当我注释掉takeUntil(s)运算符时,我才会出错。
看起来当管道运算符完成时,它的值不会立即发出,而是会在管道的下一个“迭代”结束时记住并发出,然后由新结果替换,依此类推。在我的情况下,takeUntil防止了应该发出错误的下一次迭代。我的假设是正确的吗?如果是这样,为什么rxjs是用这种方式设计的?

最佳答案

首先,每个Rx链都可以发出一个error或一个complete通知,但不能两者都发出。 (请参见http://reactivex.io/documentation/contract.html“合同管理通知”部分)。

takeUntil运算符在通知时发出complete Observable(在您的情况下为s)发出任何next通知。这意味着s发出时,链将完成,并且您将永远不会再收到任何error通知。

最后一件事,而且可能是最令人困惑的是,除非您需要花费时间(例如delay运算符)或专门将observeOn运算符与异步调度程序一起使用,否则RxJS中的所有操作都是同步发生的。因此,当您在s.next(true)内部调用map时,此next通知会立即传播到takeUntil,从而完成了链接,并且如上所述,您可以收到一个error或一个complete通知,但不会同时接收到这两个通知。

看起来您甚至不需要使用takeUntil,因为如果在map中抛出错误,它会自动包装并作为error通知(How to throw error from RxJS map operator (angular))进一步发送,并且链会自动处理,因此在此之后尝试使用takeUntil完成此操作毫无意义。

关于javascript - 可观察管道中的异常被抑制,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58235240/

10-11 13:22