编辑我刚刚编写了一个测试案例,以演示如何使用@KafkaListener ... 使用状态恢复/* * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.springframework.kafka.annotation;import static org.assertj.core.api.Assertions.assertThat;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.junit.ClassRule;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.listener.SeekToCurrentErrorHandler;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.kafka.test.rule.KafkaEmbedded;import org.springframework.kafka.test.utils.KafkaTestUtils;import org.springframework.messaging.handler.annotation.Header;import org.springframework.retry.RetryState;import org.springframework.retry.backoff.ExponentialBackOffPolicy;import org.springframework.retry.support.DefaultRetryState;import org.springframework.retry.support.RetryTemplate;import org.springframework.test.annotation.DirtiesContext;import org.springframework.test.context.junit4.SpringRunner;/** * @author Gary Russell * @since 5.0 * */@RunWith(SpringRunner.class)@DirtiesContextpublic class StatefulRetryTests { private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry"; @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1"); @Autowired private Config config; @Autowired private KafkaTemplate<Integer, String> template; @Test public void testStatefulRetry() throws Exception { this.template.send("sr1", "foo"); assertThat(this.config.listener1().latch1.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.listener1().latch2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.listener1().result).isTrue(); } @Configuration @EnableKafka public static class Config { @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler()); return factory; } @Bean public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return consumerProps; } @Bean public KafkaTemplate<Integer, String> template() { KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { return KafkaTestUtils.producerProps(embeddedKafka); } @Bean public Listener listener1() { return new Listener(); } } public static class Listener { private static final RetryTemplate retryTemplate = new RetryTemplate(); private static final ConcurrentMap<String, RetryState> states = new ConcurrentHashMap<>(); static { ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy(); retryTemplate.setBackOffPolicy(backOff); } private final CountDownLatch latch1 = new CountDownLatch(3); private final CountDownLatch latch2 = new CountDownLatch(1); private volatile boolean result; @KafkaListener(topics = "sr1", groupId = "sr1") public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset) { String recordKey = topic + partition + offset; RetryState retryState = states.get(recordKey); if (retryState == null) { retryState = new DefaultRetryState(recordKey); states.put(recordKey, retryState); } this.result = retryTemplate.execute(c -> { // do your work here this.latch1.countDown(); throw new RuntimeException("retry"); }, c -> { latch2.countDown(); return true; }, retryState); states.remove(recordKey); } }}和Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.springframework.kafka.annotation.StatefulRetryTests$Listener.listen1(java.lang.String,java.lang.String,int,long)' threw exception; nested exception is java.lang.RuntimeException: retry每次尝试交付后.在这种情况下,我添加了一个恢复器来处理重试结束后的消息.您还可以执行其他操作,例如停止容器(但可以像在ContainerStoppingErrorHandler中那样在单独的线程上执行此操作).I'm trying to implement a Spring Boot-based Kafka consumer that has some very strong message delivery guarentees, even in a case of an error.messages from a partition must be processed in order,if message processing fails, the consumption of the particular partition should be suspended,the processing should be retried with a backoff, until it succeeds.Our current implementation fulfills these requirements: @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRetryTemplate(retryTemplate()); final ContainerProperties containerProperties = factory.getContainerProperties(); containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE); containerProperties.setErrorHandler(errorHandler()); return factory; } @Bean public RetryTemplate retryTemplate() { final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(1000); backOffPolicy.setMultiplier(1.5); final RetryTemplate template = new RetryTemplate(); template.setRetryPolicy(new AlwaysRetryPolicy()); template.setBackOffPolicy(backOffPolicy); return template; } @Bean public ErrorHandler errorHandler() { return new SeekToCurrentErrorHandler(); }However, here, the record is locked by the consumer forever. At some point, the processing time will exceed max.poll.interval.ms and the server will reassign the partition to some other consumer, thus creating a duplicate.Assuming max.poll.interval.ms equal to 5 mins (default) and the failure lasting 30 mins, this will cause the message to be processed ca. 6 times.Another possiblity is to return the messages to the queue after N retries (e.g. 3 attempts), by using SimpleRetryPolicy. Then, the message will be replayed (thanks to SeekToCurrentErrorHandler) and the processing will start from scratch, again up to 5 attempts. This results in delays forming a series e.g.10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...which is less desired than an constantly rising one :)Is there any third scenario which could keep the delays forming an ascending series and, at the same time, not creating duplicates in the aforementioned example? 解决方案 It can be done with stateful retry - in which case the exception is thrown after each retry, but state is maintained in the retry state object, so the next delivery of that message will use the next delay etc.This requires something in the message (e.g. a header) to uniquely identify each message. Fortunately, with Kafka, the topic, partition and offset provide that unique key for the state.However, currently, the RetryingMessageListenerAdapter does not support stateful retry.You could disable retry in the listener container factory and use a stateful RetryTemplate in your listener, using one of the execute methods that taks a RetryState argument. - pull request issued.EDITI just wrote a test case to demonstrate using stateful recovery with a @KafkaListener.../* * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.springframework.kafka.annotation;import static org.assertj.core.api.Assertions.assertThat;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.junit.ClassRule;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.listener.SeekToCurrentErrorHandler;import org.springframework.kafka.support.KafkaHeaders;import org.springframework.kafka.test.rule.KafkaEmbedded;import org.springframework.kafka.test.utils.KafkaTestUtils;import org.springframework.messaging.handler.annotation.Header;import org.springframework.retry.RetryState;import org.springframework.retry.backoff.ExponentialBackOffPolicy;import org.springframework.retry.support.DefaultRetryState;import org.springframework.retry.support.RetryTemplate;import org.springframework.test.annotation.DirtiesContext;import org.springframework.test.context.junit4.SpringRunner;/** * @author Gary Russell * @since 5.0 * */@RunWith(SpringRunner.class)@DirtiesContextpublic class StatefulRetryTests { private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry"; @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1"); @Autowired private Config config; @Autowired private KafkaTemplate<Integer, String> template; @Test public void testStatefulRetry() throws Exception { this.template.send("sr1", "foo"); assertThat(this.config.listener1().latch1.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.listener1().latch2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.listener1().result).isTrue(); } @Configuration @EnableKafka public static class Config { @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler()); return factory; } @Bean public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return consumerProps; } @Bean public KafkaTemplate<Integer, String> template() { KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { return KafkaTestUtils.producerProps(embeddedKafka); } @Bean public Listener listener1() { return new Listener(); } } public static class Listener { private static final RetryTemplate retryTemplate = new RetryTemplate(); private static final ConcurrentMap<String, RetryState> states = new ConcurrentHashMap<>(); static { ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy(); retryTemplate.setBackOffPolicy(backOff); } private final CountDownLatch latch1 = new CountDownLatch(3); private final CountDownLatch latch2 = new CountDownLatch(1); private volatile boolean result; @KafkaListener(topics = "sr1", groupId = "sr1") public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset) { String recordKey = topic + partition + offset; RetryState retryState = states.get(recordKey); if (retryState == null) { retryState = new DefaultRetryState(recordKey); states.put(recordKey, retryState); } this.result = retryTemplate.execute(c -> { // do your work here this.latch1.countDown(); throw new RuntimeException("retry"); }, c -> { latch2.countDown(); return true; }, retryState); states.remove(recordKey); } }}andSeek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.springframework.kafka.annotation.StatefulRetryTests$Listener.listen1(java.lang.String,java.lang.String,int,long)' threw exception; nested exception is java.lang.RuntimeException: retryafter each delivery attempt.In this case, I added a recoverer to handle the message after retries are exhausted. You could do something else, like stop the container (but do that on a separate thread, like we do in the ContainerStoppingErrorHandler). 这篇关于使用spring-kafka具有消息顺序保证的指数退避的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云! 07-25 10:44