假设我希望可观察对象定期发出值,直到另一个可观察对象发出为止。因此,我可以使用timer
和takeUntil
达到目标。
但是然后我想处理每个发出的值,并在某些条件变为真时停止发出(错误)消息。所以我写下一段代码:
const { timer, Subject } = 'rxjs';
const { takeUntil, map } = 'rxjs/operators';
const s = new Subject();
let count = 0;
function processItem(item) {
count++;
return count < 3;
}
const value = "MyValue";
timer(0, 1000).pipe(
takeUntil(s),
map(() => {
if (processItem(value)) {
console.log(`Processing done`);
return value;
} else {
s.next(true);
s.complete();
console.error(`Processing error`);
throw new Error(`Stop pipe`);
}
}),
)
Playground
但是我没有完成错误,而是完成了我的Observable。
仅当我注释掉
takeUntil(s)
运算符时,我才会出错。看起来当管道运算符完成时,它的值不会立即发出,而是会在管道的下一个“迭代”结束时记住并发出,然后由新结果替换,依此类推。在我的情况下,
takeUntil
防止了应该发出错误的下一次迭代。我的假设是正确的吗?如果是这样,为什么rxjs是用这种方式设计的? 最佳答案
首先,每个Rx链都可以发出一个error
或一个complete
通知,但不能两者都发出。 (请参见http://reactivex.io/documentation/contract.html“合同管理通知”部分)。takeUntil
运算符在通知时发出complete
Observable(在您的情况下为s
)发出任何next
通知。这意味着s
发出时,链将完成,并且您将永远不会再收到任何error
通知。
最后一件事,而且可能是最令人困惑的是,除非您需要花费时间(例如delay
运算符)或专门将observeOn
运算符与异步调度程序一起使用,否则RxJS中的所有操作都是同步发生的。因此,当您在s.next(true)
内部调用map
时,此next
通知会立即传播到takeUntil
,从而完成了链接,并且如上所述,您可以收到一个error
或一个complete
通知,但不会同时接收到这两个通知。
看起来您甚至不需要使用takeUntil
,因为如果在map
中抛出错误,它会自动包装并作为error
通知(How to throw error from RxJS map operator (angular))进一步发送,并且链会自动处理,因此在此之后尝试使用takeUntil
完成此操作毫无意义。
关于javascript - 可观察管道中的异常被抑制,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58235240/