我正在尝试使用RxJS进行简单的简短民意测验。它需要每隔delay秒向服务器上的path位置发出一次请求,并在达到以下两个条件之一时结束:回调isComplete(data)返回true或它比maxTries尝试了更多次服务器。这是基本代码:

newShortPoll(path, maxTries, delay, isComplete) {
    return Observable.interval(delay)
    .take(maxTries)
    .flatMap((tryNumber) => http.get(path))
    .doWhile((data) => !isComplete(data));
  }

但是,doWhile在RxJS 5.0中不存在,因此由于take()调用,只能尝试服务器maxTries的条件有效,但是isComplete条件不起作用。我如何使它可观察将next()的值,直到isComplete返回true,这时它将next()该值和complete()。

我应该注意takeWhile()在这里对我不起作用。它不返回实际上是最重要的最后一个值,因为那是我们知道完成的时间。

谢谢!

最佳答案

我们可以创建一个实用程序函数来创建第二个Observable,它发出内部Observable发出的每个项目。但是,一旦满足条件,我们将调用onCompleted函数:

function takeUntilInclusive(inner$, predicate) {
    return Rx.Observable.create(observer => {
        var subscription = inner$.subscribe(item => {
            observer.onNext(item);

            if (predicate(item)) {
                observer.onCompleted();
            }
        }, observer.onError, observer.onCompleted);


        return () => {
            subscription.dispose();
        }
    });
}

这是使用我们的新实用程序方法的快速摘要:
const inner$ = Rx.Observable.range(0, 4);
const data$ = takeUntilInclusive(inner$, (x) => x > 2);
data$.subscribe(x => console.log(x));

// >> 0
// >> 1
// >> 2
// >> 3

该答案基于:RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

09-27 01:35