问题描述
是否有一种方法可以根据下游运行状况自动调整Project Reactor中元素之间的延迟?
Is there a way to automatically adjust delay between elements in Project Reactor based on downstream health?
我有一个应用程序,它从Kafka主题读取记录,为每个记录发送一个HTTP请求,并将结果写入另一个Kafka主题.从Kafka读写数据很容易,但是第三方HTTP服务很容易被淹没,因此我将 delayElements()
与属性文件中的值一起使用,这意味着该值不会在应用程序运行时更改.这是一个代码示例:
I have an application that reads records from Kafka topic, sends an HTTP request for each one of them and writes the result to another Kafka topic. Reading and writing from/to Kafka is fast and easy, but the third party HTTP service is easily overwhelmed, so I use delayElements()
with a value from a property file, which means that this value does not change during application runtime. Here's a code sample:
kafkaReceiver.receiveAutoAck()
.concatMap(identity())
.delayElements(ofMillis(delayElement))
.flatMap(message -> recordProcessingFunction.process(message.value()), messageRate)
.onErrorContinue(handleError())
.map(this::getSenderRecord)
.flatMap(kafkaSender::send)
但是,第三方服务的加班时间可能有所不同,我希望能够相应地调整此延迟.假设,如果我发现超过5%的请求在10秒内失败,那么我会增加延迟.如果它在10秒钟内降到5%以下,那么我会再次减少延迟.
However, the third party service might perform differently overtime and I'd like to be able to adjust this delay accordingly. Let's say, if I see that over 5% of requests fail over 10 second period, I would increase the delay. If it gets lower than 5% for over 10 sec, then I would reduce the delay again.
Reactor中是否存在用于此目的的机制?我可以从我的角度考虑一些创造性的解决方案,但是想知道他们(或其他人)是否已经实施了该解决方案.
Is there an existing mechanism for that in Reactor? I can think of some creative solutions from my side, but was wondering if they (or someone else) already implemented that.
推荐答案
您可以添加具有指数补偿的重试.像这样的东西:
You can add a retry with exponential backoff. Somethign like this:
influx()
.flatMap(x -> Mono.just(x)
.map(data -> apiCall(data))
.retryWhen(
Retry.backoff(Integet.MAX_VALUE, Duration.ofSeconds(30))
.filter(err -> err instanceof RuntimeException)
.doBeforeRetry(
s -> log.warn("Retrying for err {}", s.failure().getMessage()))
.onRetryExhaustedThrow((spec, sig) -> new RuntimeException("ex")))
.onErrorResume(err -> Mono.empty()),
concurrency_val,
prefetch_val)
这将重试失败的请求Integet.MAX_VALUE次,每次重试之间的最短时间为30秒.后续的重试实际上会偏移一个可配置的抖动因子(默认值为0.5),从而导致两次重试之间的持续时间增加.
This will retry the failed request Integet.MAX_VALUE times with minimum time of 30s between each retry. The subsequent retries are actually offset by a configurable jitter factor (default value = 0.5) causing the duration to increase between successive retries.
Retry.backoff
上的文档说:
此外,由于整个操作都映射在 flatMap
中,因此您可以更改默认的 concurrency
和 prefetch
值,以便进行核算整个管道等待RetryBackOffSpec成功完成时,在任何给定时间可能失败的最大请求数.
Also, since the whole operation is mapped in flatMap
, you can vary the default concurrency
and prefetch
values for it in order to account for the maximum number of requests that can fail at any given time while the whole pipeline waits for the RetryBackOffSpec to complete successfully.
最坏的情况是,您的 concurrency_val
个请求失败,并且等待30秒钟以上才能重试.如果下游系统无法及时恢复,则整个操作可能会停止(仍在等待下游的成功),这可能是不希望的.最好将backOff限制从 Integer.MAX_VALUE
替换为可管理的值,超过该限制后,它只会记录错误并继续进行下一个事件.
Worst case scenario, your concurrency_val
number of requests have failed and waiting for 30+ seconds for the retry to happen. The whole operation might halt down (still waiting for success from downstream) which may not be desirable if the downstream system don't recover in time. Better to replace backOff limit from Integer.MAX_VALUE
to something managable beyond which it would just log the error and proceed with next event.
这篇关于Reactor中的自动速率调整的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!