本文介绍了如何在 spring kafka 中确认当前偏移量以进行手动提交的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我第一次使用 Spring Kafka,但无法在我的消费者代码中使用 Acknowledgement.acknowledge() 方法进行手动提交.如果我的消费者配置或侦听器代码中缺少任何内容,请告诉我.或者还有其他方法可以根据条件处理确认偏移量.在这里,我正在寻找解决方案,如果偏移量没有手动提交/确认,它应该由消费者选择相同的消息/偏移量.

I am using Spring Kafka first time and I am not able to use Acknowledgement.acknowledge() method for manual commit in my consumer code. please let me know if anything missing in my consumer configuration or listener code. or else is there other way to handle acknowledge offset based on condition. Here i'm looking solution like if the offset is not committed/ acknowledge manually, it should pick same message/offset by consumer.

配置

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;

@EnableKafka
@Configuration
public class ConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.groupId}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
                props));
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
}

听众

private static int value = 1;

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    }
    value++;
}

推荐答案

将 enable-auto-commit 属性设置为 false:

Set the enable-auto-commit property to false:

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

将确认模式设置为 MANUAL_IMMEDIATE:

Set the ack-mode to MANUAL_IMMEDIATE:

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

然后,在您的使用者/侦听器代码中,您可以手动提交偏移量,如下所示:

Then, in your consumer/listener code, you can commit the offset manually, like this:

@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,  
        Acknowledgment acknowledgment) {

    System.out.println("Received message: ");
    System.out.println(consumerRecord.value().toString());

    acknowledgment.acknowledge();
}

更新:我为此创建了一个小型 POC.在此处查看,可能对您有所帮助.

Update: I created a small POC for this. Check it out here, might help you.

这篇关于如何在 spring kafka 中确认当前偏移量以进行手动提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-22 23:38