本文介绍了如何重试反应堆中出现故障的耗材记录-卡夫卡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试Reactive-Kafka消费消息。其他一切都很好,但我想为失败的消息添加一个重试(2)。默认情况下,Spring-Kafka已经重试失败记录3次,我想使用Reactive-Kafka实现相同的记录。

我正在使用Spring-Kafka作为Active-Kafka的包装器。以下是我的消费者模板:

reactiveKafkaConsumerTemplate
                .receiveAutoAck()
                .map(ConsumerRecord::value)
                .flatMap(this::consumeWithRetry)
                .onErrorContinue((error, value)->log.error("something bad happened while consuming : {}", error.getMessage()))
                .retryWhen(Retry.backoff(30, Duration.of(10, ChronoUnit.SECONDS)))
                .subscribe();

让我们考虑一下Consumer方法,如下所示

public Mono<Void> consume(MessageRecord message){
   return Mono.error(new RuntimeException("test retry"); //sample error scenario
}

我正在使用以下逻辑在失败时重试Consumer方法。

public Mono<Void> consumeWithRetry(MessageRecord message){
   return consume(message)
          .retry(2);
}
如果当前消费者记录因异常而失败,我希望重试使用消息。我试图用另一次重试来包装消费方法(3),但这并不能达到目的。最后一次重试时仅用于重试Kafka重新平衡的订阅。

@Simon-Baslé@Gary-Russell

推荐答案

以前重试时,我使用了以下方法:

public Mono<Void> consumeWithRetry(MessageRecord message){
   return consume(message)
          .retry(2);
}

但它不是重试。添加Mono.defer后,上述代码将正常工作,并添加所需的重试。

public Mono<Void> consumeWithRetry(MessageRecord message){
   return Mono.defer(()->consume(message))
          .retry(2);
}

这篇关于如何重试反应堆中出现故障的耗材记录-卡夫卡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-29 16:12