我创建了一个Observable,可以将其结果缓存一段时间。这个例子很棒,非常有用!但是我无法为商品生产者设置超时时间。我试图在mockDataFetch()中使用超时操作符,但是在第一个失败的项目之后,无法恢复该流。
如何在超时的情况下实现mockDataFetch

这正是我所做的:

const Observable = Rx.Observable;

var counter = 1;
var updateRequest = Observable.defer(() => mockDataFetch())
    .publishReplay(1, 1000)
    .refCount();

function mockDataFetch() {
    return Observable.of(counter++)
        .delay(Math.floor((Math.random() * 100) + 1))
        .timeout(50);
}

function mockHttpCache() {
    return updateRequest
        .take(1);
}


另一方面,如果在mockDataFetch内获得例外会怎样?我希望在下一个项目(在publishReplay方法中定义的1000毫秒后)的observable发出一个新项目。

最佳答案

我认为我应该更新示例并添加此用例,因为这是一种很常见的情况(无论如何,我很高兴您发现它很有用!)。

当从mockDataFetch()返回的Observable发送错误/完成通知时,主题内部将其自身标记为已停止(请参见说明Rx.Subject loses events),因此它不会重新发布任何项目。理想情况下,您可以使用catch()中的mockDataFetch()运算符捕获所有错误:

function mockDataFetch() {
    return Observable.of(counter++)
        .delay(Math.floor((Math.random() * 100) + 1))
        .timeout(50)
        .catch(err => Observable.of('This request is broken.'));
}


观看现场演示:https://jsbin.com/jiguti/5/edit?js,console

例如,由此产生的输出如下所示:

Response 0: This request is broken.
Response 50: This request is broken.
Response 200: This request is broken.
Response 1200: 2
Response 1500: 2
Response 3500: This request is broken.

07-25 21:56