问题描述
使用 Spring Cloud Stream 上通道的标准配置,消息重试 3 次,然后跳过.如果以下消息处理成功完成,则提交偏移量.这意味着在瞬态异常下消息可能会丢失.
With the standard configuration of a channel on Spring Cloud Stream a message is retried 3 times and then skipped. If the following message processing completes successfully the offset is committed.That means that under transient exceptions messages can be lost.
是否可以更改此行为,使通道卡在一条失败的消息上,直到瞬态条件得到修复?
Can this behavior be changed, so the channel get stuck on a failing message until the transient condition is repaired?
我已尝试配置重试模板,但您必须指定重试次数,这看起来像是一个无用的参数,因为所需的行为是永远重试.
I have tried configuring the retry template, but you have to specify a number of retries, which looks like a useless parameter, as the desired behavior is retrying forever.
有人遇到过这些问题吗?谢谢.
Did anyone fall into these troubles? Thank you.
我也怀疑这会如何干扰 max.poll.interval.ms
属性.
I also have some doubts about how this can interfere with the max.poll.interval.ms
property.
推荐答案
在 binder 中禁用重试并使用 ListenerContainerCustomizer
添加具有无限重试的 SeekToCurrentErrorHandler
...
Disable retry in the binder and use a ListenerContainerCustomizer
to add a SeekToCurrentErrorHandler
with infinite retries...
@SpringBootApplication
public class So63193500Application {
public static void main(String[] args) {
SpringApplication.run(So63193500Application.class, args);
}
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
throw new RuntimeException("test");
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
if (group.equals("so63193500")) {
container.setErrorHandler(new SeekToCurrentErrorHandler(
new FixedBackOff(5_000, FixedBackOff.UNLIMITED_ATTEMPTS)));
}
};
}
}
spring.cloud.stream.bindings.input-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.input-in-0.group=so63193500
这会导致搜索并且不会影响轮询间隔,只要回退间隔不是太长.
This causes a seek and won't affect the poll interval as long as the back off interval is not too long.
您也可以使用 ExponentialBackOff
.
这篇关于从最后一条成功消息中重放 Kafka 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!