我正在尝试使用RxJS进行简单的简短民意测验。它需要每隔delay
秒向服务器上的path
位置发出一次请求,并在达到以下两个条件之一时结束:回调isComplete(data)
返回true或它比maxTries
尝试了更多次服务器。这是基本代码:
newShortPoll(path, maxTries, delay, isComplete) {
return Observable.interval(delay)
.take(maxTries)
.flatMap((tryNumber) => http.get(path))
.doWhile((data) => !isComplete(data));
}
但是,doWhile在RxJS 5.0中不存在,因此由于take()调用,只能尝试服务器
maxTries
的条件有效,但是isComplete
条件不起作用。我如何使它可观察将next()的值,直到isComplete返回true,这时它将next()该值和complete()。我应该注意
takeWhile()
在这里对我不起作用。它不返回实际上是最重要的最后一个值,因为那是我们知道完成的时间。 谢谢!
最佳答案
我们可以创建一个实用程序函数来创建第二个Observable,它发出内部Observable发出的每个项目。但是,一旦满足条件,我们将调用onCompleted函数:
function takeUntilInclusive(inner$, predicate) {
return Rx.Observable.create(observer => {
var subscription = inner$.subscribe(item => {
observer.onNext(item);
if (predicate(item)) {
observer.onCompleted();
}
}, observer.onError, observer.onCompleted);
return () => {
subscription.dispose();
}
});
}
这是使用我们的新实用程序方法的快速摘要:
const inner$ = Rx.Observable.range(0, 4);
const data$ = takeUntilInclusive(inner$, (x) => x > 2);
data$.subscribe(x => console.log(x));
// >> 0
// >> 1
// >> 2
// >> 3
该答案基于:RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after