考虑以下代码-
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我创建了一个消费者工厂和一个并发的KafkaListenercontainer工厂。我尚未设置监听器工厂的并发性。
我有一个用@KafkaListener注释的方法
@KafkaListener(topics = "topicName")
public void listen(String message) {
System.out.println("Received Message: " + message);
当我不设置并发属性时,Spring是否会创建1个消费者实例,1个属于消费者工厂中指定组的kafka侦听器容器?
如果我将并发性更改为3,是否会春季创建3个使用者实例,因此在配置使用者工厂和3个侦听器容器时指定了同一使用者组中的3个使用者?
另外,根据并发性,并假设我们现在仅侦听一个主题,我们将有3种方法带有@kafkalistener注释,如果未指定分区,则这3种方法都将侦听不同的分区(由kafka以循环方式提供)。 ?
我是Kafka的新手,想澄清一下我的理解。
最佳答案
当我不设置并发属性时,Spring会创建1个消费者实例,1个属于消费者工厂中指定的组的kafka侦听器容器吗?
您将有一个使用者从该主题的所有分区中获取事件。
如果我将并发性更改为3,是否会春季创建3个使用者实例,因此在配置使用者工厂和3个侦听器容器时指定了同一使用者组中的3个使用者?
您将有3个使用者实例,如果在该主题中至少有3个分区,则每个实例都将从这些分区之一中获取事件。使用者将事件传递到该KafkaListener实例。
您可以更具体。
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")
}))
public void listenToParition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Messasge: " + message"
+ "from partition: " + partition);
}
另外,根据并发性,并假设我们现在仅侦听一个主题,我们将有3种方法带有@kafkalistener注释,如果未指定分区,则这3种方法都将侦听不同的分区(由kafka以循环方式提供)。 ?
这是没有道理的。首先,KafkaListeners是Spring Kafka的高级抽象,如果您有3个消费者(相同的消费者组+聆听),则Kafka完全不会进行任何循环(从消费者的角度来看,与生产者不同)。相同的主题)和该主题中的3个分区,Kafka将重新平衡并为一个使用者分配一个分区,每个使用者将仅从由Kafka分配的分区中获取事件。 Spring Kafka在收到每个使用者中的事件后,将在KafkaListener实例中交付事件。
关于java - Kafka Spring -试图了解事物在幕后的运作方式,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48254650/