是否有一种方法可以根据下游运行状况自动调整Project Reactor中元素之间的延迟?
Is there a way to automatically adjust delay between elements in Project Reactor based on downstream health?
我有一个应用程序,它从Kafka主题读取记录,为每个记录发送一个HTTP请求,并将结果写入另一个Kafka主题.从Kafka读写数据很容易,但是第三方HTTP服务很容易被淹没,因此我将 delayElements()
I have an application that reads records from Kafka topic, sends an HTTP request for each one of them and writes the result to another Kafka topic. Reading and writing from/to Kafka is fast and easy, but the third party HTTP service is easily overwhelmed, so I use delayElements()
with a value from a property file, which means that this value does not change during application runtime. Here's a code sample:
.flatMap(message -> recordProcessingFunction.process(message.value()), messageRate)
However, the third party service might perform differently overtime and I'd like to be able to adjust this delay accordingly. Let's say, if I see that over 5% of requests fail over 10 second period, I would increase the delay. If it gets lower than 5% for over 10 sec, then I would reduce the delay again.
Is there an existing mechanism for that in Reactor? I can think of some creative solutions from my side, but was wondering if they (or someone else) already implemented that.
You can add a retry with exponential backoff. Somethign like this:
.flatMap(x -> Mono.just(x)
.map(data -> apiCall(data))
Retry.backoff(Integet.MAX_VALUE, Duration.ofSeconds(30))
.filter(err -> err instanceof RuntimeException)
s -> log.warn("Retrying for err {}", s.failure().getMessage()))
.onRetryExhaustedThrow((spec, sig) -> new RuntimeException("ex")))
.onErrorResume(err -> Mono.empty()),
This will retry the failed request Integet.MAX_VALUE times with minimum time of 30s between each retry. The subsequent retries are actually offset by a configurable jitter factor (default value = 0.5) causing the duration to increase between successive retries.
此外,由于整个操作都映射在 flatMap
中,因此您可以更改默认的 concurrency
和 prefetch
Also, since the whole operation is mapped in flatMap
, you can vary the default concurrency
and prefetch
values for it in order to account for the maximum number of requests that can fail at any given time while the whole pipeline waits for the RetryBackOffSpec to complete successfully.
最坏的情况是,您的 concurrency_val
个请求失败,并且等待30秒钟以上才能重试.如果下游系统无法及时恢复,则整个操作可能会停止(仍在等待下游的成功),这可能是不希望的.最好将backOff限制从 Integer.MAX_VALUE
Worst case scenario, your concurrency_val
number of requests have failed and waiting for 30+ seconds for the retry to happen. The whole operation might halt down (still waiting for success from downstream) which may not be desirable if the downstream system don't recover in time. Better to replace backOff limit from Integer.MAX_VALUE
to something managable beyond which it would just log the error and proceed with next event.