本文介绍了ConsumerRecords 在Kafka、Java 中始终为空,但Future<RecordMetadata> 是空的.isDone 方法结果为真的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时删除!!

我尝试将消息发送到确切的 Kafka 主题,然后从中接收消息.我有 3 个用于消费者、生产者和主题的配置类:

I try to send message to exact Kafka topic and then receive message from it. I have 3 configuration classes for Consumer, Producer and Topic:

public class KafkaTopicConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
        return new KafkaAdmin(configs);
    }
}
public class KafkaConsumerConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public Consumer<String, String> consumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                globalConfiguration.kafka().bootstrapAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                globalConfiguration.kafka().groupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        return new KafkaConsumer<>(props);
    }
}
public class KafkaProducerConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new KafkaProducer<>(configProps);
    }
}

GlobalConfiguration 类存储我的所有属性.对于卡夫卡:

GlobalConfiguration class stores all my properties. For Kafka:

bootstrapAddress = "localhost:9092"
groupId = "KafkaExampleConsumer"

那我就这样发消息

    private void sendMessage(final String topic, final String message) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
        Future<RecordMetadata> sendResponse = kafkaProducer.send(producerRecord);
        kafkaProducer.flush();
        boolean isSent = sendResponse.isDone();
    }

我检查消息是否使用 sendResponse.isDone() 发送并返回 true.但后来我尝试接收消息:

I check if message is sent with sendResponse.isDone() and it returns true.But then I try to receive message:

protected String receiveMessage(final String topic, final String message) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
        consumer.subscribe(Collections.singleton(topic));
        ConsumerRecords<String, String> consumerRecords =
                consumer.poll(Duration.ofMillis(10000));
        consumerRecords.isEmpty();
    }

而且 ConsumerRecords 总是空的.可能是什么问题?

And ConsumerRecords always empty. What problem it can be?

推荐答案

如果你想消费之前发送的记录,而不是轮询你启动消费后10秒内发送的记录,则需要添加

If you want to consume previously sent records, rather than poll records sent within 10 seconds of you starting the consumer, you need to add

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
            OffsetResetStrategy.EARLIEST.name().toLowerCase());

这篇关于ConsumerRecords 在Kafka、Java 中始终为空,但Future<RecordMetadata> 是空的.isDone 方法结果为真的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

1403页,肝出来的..

09-07 01:50