考虑以下代码-

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      bootstrapAddress);
    props.put(
      ConsumerConfig.GROUP_ID_CONFIG,
      groupId);
    props.put(
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      StringDeserializer.class);
    props.put(
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory
      = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}


我创建了一个消费者工厂和一个并发的KafkaListenercontainer工厂。我尚未设置监听器工厂的并发性。
我有一个用@KafkaListener注释的方法

@KafkaListener(topics = "topicName")
public void listen(String message) {
    System.out.println("Received Message: " + message);


当我不设置并发属性时,Spring是否会创建1个消费者实例,1个属于消费者工厂中指定组的kafka侦听器容器?

如果我将并发性更改为3,是否会春季创建3个使用者实例,因此在配置使用者工厂和3个侦听器容器时指定了同一使用者组中的3个使用者?

另外,根据并发性,并假设我们现在仅侦听一个主题,我们将有3种方法带有@kafkalistener注释,如果未指定分区,则这3种方法都将侦听不同的分区(由kafka以循环方式提供)。 ?

我是Kafka的新手,想澄清一下我的理解。

最佳答案

当我不设置并发属性时,Spring会创建1个消费者实例,1个属于消费者工厂中指定的组的kafka侦听器容器吗?


  您将有一个使用者从该主题的所有分区中获取事件。


如果我将并发性更改为3,是否会春季创建3个使用者实例,因此在配置使用者工厂和3个侦听器容器时指定了同一使用者组中的3个使用者?


  您将有3个使用者实例,如果在该主题中至少有3个分区,则每个实例都将从这些分区之一中获取事件。使用者将事件传递到该KafkaListener实例。
  
  您可以更具体。


@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"),
    @PartitionOffset(partition = "3", initialOffset = "0")
}))
public void listenToParition(
  @Payload String message,
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Messasge: " + message"
        + "from partition: " + partition);
}


另外,根据并发性,并假设我们现在仅侦听一个主题,我们将有3种方法带有@kafkalistener注释,如果未指定分区,则这3种方法都将侦听不同的分区(由kafka以循环方式提供)。 ?


  这是没有道理的。首先,KafkaListeners是Spring Kafka的高级抽象,如果您有3个消费者(相同的消费者组+聆听),则Kafka完全不会进行任何循环(从消费者的角度来看,与生产者不同)。相同的主题)和该主题中的3个分区,Kafka将重新平衡并为一个使用者分配一个分区,每个使用者将仅从由Kafka分配的分区中获取事件。 Spring Kafka在收到每个使用者中的事件后,将在KafkaListener实例中交付事件。

关于java - Kafka Spring -试图了解事物在幕后的运作方式,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48254650/

10-10 06:08