我是Observers的新手,但我仍在设法弄清楚它们。我有以下代码:

observableKafka.getRealTimeEvents()
        .filter(this::isTrackedAccount)
        .filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
        .map(ledgerMapper::mapLedgerTransaction)
        .map(offerCache::addTransaction)
        .filter(offer -> offer != null)  // Offer may have been removed from cache since last check
        .filter(Offer::isReady)
        .doOnError(throwable -> {
              LOG.info("Exception thrown on realtime events");
          })
        .forEach(awardChecker::awardFailOrIgnore);
getRealTimeEvents()返回一个Observable<Event>
.doOnError的位置重要吗?另外,在这段代码中添加多个调用会对您产生什么影响?我已经意识到我可以做到,并且所有这些都可以被调用,但是我不确定它的目的是什么。

最佳答案

是的,它确实。当错误在特定点通过流时,doOnError起作用,因此,如果在doOnError抛出之前,运算符将被调用。但是,如果将doOnError放在更上方,则可能会或可能不会调用doOnError,具体取决于链中的下游运算符。

给定

Observer<Object> ignore = new Observer<Object>() {
    @Override public void onCompleted() {
    }
    @Override public void onError(Throwable e) {
    }
    @Override public void onNext(Object t) {
    }
};

例如,以下代码将始终调用doOnError:
Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore);

但是,此代码不会:
Observable.just(1).doOnError(e -> log(e))
.flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore);

大多数运营商会反弹源自下游的异常。

如果通过onErrorResumeNextonExceptionResumeNext转换异常,则添加multipe ojit_code是可行的:
Observable.<Object>error(new RuntimeException())
.doOnError(e -> log(e))
.onErrorResumeNext(Observable.<Object>error(new IllegalStateException()))
.doOnError(e -> log(e)).subscribe(ignore);

否则,您将在链的多个位置记录相同的异常。

关于java - 可观察的doOnError正确位置,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29897173/

10-10 16:13