本文介绍了如何重试反应堆中出现故障的耗材记录-卡夫卡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试Reactive-Kafka消费消息。其他一切都很好,但我想为失败的消息添加一个重试(2)。默认情况下,Spring-Kafka已经重试失败记录3次,我想使用Reactive-Kafka实现相同的记录。
我正在使用Spring-Kafka作为Active-Kafka的包装器。以下是我的消费者模板:
reactiveKafkaConsumerTemplate
.receiveAutoAck()
.map(ConsumerRecord::value)
.flatMap(this::consumeWithRetry)
.onErrorContinue((error, value)->log.error("something bad happened while consuming : {}", error.getMessage()))
.retryWhen(Retry.backoff(30, Duration.of(10, ChronoUnit.SECONDS)))
.subscribe();
让我们考虑一下Consumer方法,如下所示
public Mono<Void> consume(MessageRecord message){
return Mono.error(new RuntimeException("test retry"); //sample error scenario
}
我正在使用以下逻辑在失败时重试Consumer方法。
public Mono<Void> consumeWithRetry(MessageRecord message){
return consume(message)
.retry(2);
}
如果当前消费者记录因异常而失败,我希望重试使用消息。我试图用另一次重试来包装消费方法(3),但这并不能达到目的。最后一次重试时仅用于重试Kafka重新平衡的订阅。@Simon-Baslé@Gary-Russell
推荐答案
以前重试时,我使用了以下方法:
public Mono<Void> consumeWithRetry(MessageRecord message){
return consume(message)
.retry(2);
}
但它不是重试。添加Mono.defer后,上述代码将正常工作,并添加所需的重试。
public Mono<Void> consumeWithRetry(MessageRecord message){
return Mono.defer(()->consume(message))
.retry(2);
}
这篇关于如何重试反应堆中出现故障的耗材记录-卡夫卡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!