问题描述
在我的生产代码中,当 Mono 超时时,我的日志中出现错误.
我已设法使用以下代码重新创建这些错误:
In my Production code, I am getting errors in my logs when a Mono times out.
I have managed to recreate these errors with the following code:
@Test
public void testScheduler() {
Mono<String> callableMethod1 = callableMethod();
callableMethod1.block();
Mono<String> callableMethod2 = callableMethod();
callableMethod2.block();
}
private Mono<String> callableMethod() {
return Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"));
}
在 Mono.fromCallable
中,我使用第三方库进行阻塞调用.当此调用超时时,我会收到类似于
In the Mono.fromCallable
I am making a blocking call using a third-party library. When this call times out, I get errors similar to
reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.publisher.Operators - Scheduler worker in group main failed with an uncaught exception
这些错误似乎也是间歇性的,有时当我运行代码时,我根本没有发现任何错误.但是,当我以 10 的循环重复调用时,我始终得到它们.
These errors also seem to be intermittent, sometimes when I run the code provided I get no errors at all. However when I repeat the call in a loop of say 10, I consistently get them.
推荐答案
问题:为什么会出现这个错误?
Question: Why does this error happen?
答案:
当指定给 timeout() 操作符的持续时间已过时,它会抛出一个 TimeoutException.这会导致以下结果:
When the duration given to the timeout() operator has passed, it throws a TimeoutException. That results in the following outcomes:
一个 onError 信号被发送到主反应链.结果,主执行被恢复,进程继续进行(即执行onErrorResume()).
An onError signal is sent to the main reactive chain. As a result, the main execution is resumed and the process moves on (i.e., onErrorResume() is executed).
在结果 #1 之后不久,fromCallable() 中定义的异步任务被中断,这会触发第二个异常 (InterruptedException).主反应链无法再处理这个 InterruptedException 因为 TimeoutException 首先发生并且已经导致主反应链恢复(注意:这种行为不生成第二个 onError 信号符合响应式流规范 -> 出版商 #7).
Shortly after outcome #1, the async task defined within fromCallable() is interrupted, which triggers a 2nd exception (InterruptedException). The main reactive chain can no longer handle this InterruptedException because the TimeoutException happened first and already caused the main reactive chain to resume (Note: this behavior of not generating a 2nd onError signal conforms with the Reactive Stream Specification -> Publisher #7).
由于主链无法正常处理第二个异常(InterruptedException),Reactor 将其记录在错误级别以告知我们发生了意外异常.
Since the 2nd exception (InterruptedException) can't be handled gracefully by the main chain, Reactor logs it at error level to let us know an unexpected exception occurred.
问题:我该如何摆脱它们?
简短回答:使用 Hooks.onErrorDropped() 更改日志级别:
Short Answer: Use Hooks.onErrorDropped() to change the log level:
Logger logger = Logger.getLogger(this.getClass().getName());
@Test
public void test() {
Hooks.onErrorDropped(error -> {
logger.log(Level.WARNING, "Exception happened:", error);
});
Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
长答案:如果您的用例允许,您可以处理 fromCallable() 内发生的异常,这样影响主链的唯一异常是 超时异常.在这种情况下,onErrorDropped() 一开始就不会发生.
Long Answer: If your use-case allows, you could handle the exception happening within fromCallable() so that the only exception affecting the main chain is the TimeoutException. In that case, the onErrorDropped() wouldn't happen in the first place.
@Test
public void test() {
Mono.fromCallable(() -> {
try {
Thread.sleep(60);
} catch (InterruptedException ex) {
//release resources, rollback actions, etc
logger.log(Level.WARNING, "Something went wrong...", ex);
}
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
额外参考:
https://medium.com/@kalpads/configuring-timeouts-in-spring-reactive-webclient-4bc5faf56411
这篇关于“操作符调用默认的 onErrorDropped"单声道超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!