本文介绍了创建不带注释的KafkaListener&;不带弹簧靴子的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试为一个主题创建一个Kafka Consumer,而不使用@KafkaListener注释。我之所以要这样做,是因为我试图在不使用SpringBoot的情况下,基于Application.Properties动态创建侦听器。
我想最好的方法是手动创建一个KafkaListenerContainerFactory。谁能提供一个在它自己的类中如何做到这一点的例子吗?
推荐答案
- 带弹簧
@Bean
public KafkaMessageListenerContainer<String, String> messageListenerContainer(String topic) {
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(new MyMessageListener());
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
listenerContainer.setAutoStartup(false);
// bean name is the prefix of kafka consumer thread name
listenerContainer.setBeanName("kafka-message-listener");
return listenerContainer;
}
private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
return props;
}
static class MyMessageListener implements MessageListener<String, String>
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// do something
}
}
- 无弹簧
kafka documentation非常有用。下面是它的一个用法示例。
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
这篇关于创建不带注释的KafkaListener&;不带弹簧靴子的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!