开心一刻

今早,女朋友给我发微信
她:宝贝,你要记住
她:我可是你女朋友,你相亲就亲,想抱就抱
她:不要老是问我,男生要主动一些
我:可是你上次报警可不是这么说的

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

基础回顾

Spring Boot 集成 Kafka 非常简单,我相信你们都会,但我还是想带你们回顾下;只需要进行以下几步即可完成 Spring Boot 与 Kafka 的集成

  1. 引入依赖

    如果只是单纯的集成,不考虑其他功能,那么添加如下依赖即可

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    

    如果还需要 web 功能,则可以像如下一样添加依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    

    依赖就是如此简单;扯个题外问题

  2. 添加配置

    如果 Kafka 未开启认证,那配置可以非常简单

    spring:
      kafka:
        bootstrap-servers: 192.168.0.87:9092,192.168.0.88:9092,192.168.0.89:9092
    

    但实际使用中,往往会开启认证,并对 consumer 做定制化配置,配置往往类似如下

    spring:
      kafka:
        bootstrap-servers: 192.168.0.87:9092,192.168.0.88:9092,192.168.0.89:9092
        consumer:
          # 自动提交消费位移
          enable-auto-commit: false
          # 偏移量初始位置
          auto-offset-reset: latest
          # 一次拉取记录最大数
          max-poll-records: 5
          properties:
            security.protocol: SASL_PLAINTEXT
            sasl.mechanism: PLAIN
            #sasl.mechanism: SCRAM-SHA-256
            #username、password需要调整成实际值
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
            #sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
        listener:
          ack-mode: manual
        producer:
          properties:
            security.protocol: SASL_PLAINTEXT
            sasl.mechanism: PLAIN
            #sasl.mechanism: SCRAM-SHA-256
            #username、password需要调整成实际值
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
            #sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";  
    

    也不复杂,相信你们都能看懂

  3. 进行使用

    分两点:消费消息发送消息

    消费消息 实现很简单

    /**
     * @author: 青石路
     */
    @Component
    public class KafkaConsumer {
    
        private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    
        @KafkaListener(topics = "tp_qsl_order_cancel", groupId = "gid_qsl_order_cancel")
        public void listenOrder(String message, Acknowledgment acknowledgment) {
            try {
                log.info("收到kafka message: {}", message);
                // TODO 业务处理
            } finally {
                acknowledgment.acknowledge();
            }
        }
    }
    

    监听的 topictp_qsl_order_cancel,消费者组指定为 gid_qsl_order_cancel;这样,消费监听就算完成了

    发送消息 实现同样简单,注入 KafkaTemplate,然后调用其 send 方法即可

    /**
     * @author: 青石路
     */
    @Component
    public class KafkaSender {
    
        private static final Logger log = LoggerFactory.getLogger(KafkaSender.class);
    
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void send(String topic, String msg) {
            kafkaTemplate.send(topic, msg).addCallback(
                    success -> {
                        if (success != null) {
                            log.info("消息发送成功: Topic={}, Partition={}, Offset={}",
                                    success.getRecordMetadata().topic(), success.getRecordMetadata().partition(), success.getRecordMetadata().offset());
                        }
                    },
                    failure -> {
                        log.error("消息发送失败:", failure.getCause());
                    }
            );
        }
    }
    

    KafkaTemplate 提供了多个 send 方法

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    我们可以按需选择

上面 3 步都完成后,即可启动应用进行测试了

  1. 消费消息

    这个测试很简单,直接往 tp_qsl_order_cancel topic 中发送一条消息即可

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    点击 发送消息 后,控制台输出

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    消息正常消费,没有任何毛病

  2. 发送消息

    我加了一个 OrderController

    /**
     * @author: 青石路
     */
    @RestController
    @RequestMapping("order")
    public class OrderController {
    
        @Resource
        private KafkaSender kafkaSender;
    
        @GetMapping("add")
        public String add(String orderInfo) {
            // TODO 订单业务处理
            // 下发消息到库存
            kafkaSender.send("tp_qsl_inventory_order_add", orderInfo);
            return "下单成功";
        }
    }
    

    便于测试消息发送;直接发起 http 请求

    http://localhost:8080/order/add?orderInfo=订单完整信息
    

    然后就可以去 tp_qsl_inventory_order_add topic 中看消息是否发送成功

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    消息正常发送,也没有任何毛病

至此,Spring Boot 集成 Kafka 就算大功告成了;如此简单,相信你们都能轻松拿捏

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

Kafka 多源

上述只讲了单 Kafka 源的情况,也就是 消费消息发送消息 针对的是同个 Kafka 源;但实际工作中,同个项目连接多个 Kafka 源的情况是非常常见的,我们就以 2 个 Kafka 源为例,从其中一个源消费消息、向另一个源发送消息,该如何实现?其实也不难,按以下几步调整即可

  1. 配置文件调整

    既然有 2 个 Kafka 源,那么我们的配置文件就需要配置 2 个,类似如下

    spring:
      kafka:
        first:
          bootstrap-servers: 192.168.0.87:9092,192.168.0.88:9092,192.168.0.89:9092
          consumer:
            # 自动提交消费位移
            enable-auto-commit: false
            # 偏移量初始位置
            auto-offset-reset: latest
            # 一次拉取记录最大数
            max-poll-records: 5
            # properties:
              # security.protocol: SASL_PLAINTEXT
              # sasl.mechanism: PLAIN
              #sasl.mechanism: SCRAM-SHA-256
              #username、password需要调整成实际值
              # sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
              #sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
          listener:
            ack-mode: manual
        second:
          bootstrap-servers: 192.168.0.90:9092
          #producer:
            #properties:
              #security.protocol: SASL_PLAINTEXT
              #sasl.mechanism: PLAIN
              #sasl.mechanism: SCRAM-SHA-256
              #username、password需要调整成实际值
              #sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
              #sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
    

    这里的 firstsecond 不是固定的,你们想怎么命名就怎么命名;既然这么灵活,那 Spring Boot 肯定是不支持的,那么如上配置,Spring Boot 是识别不了的,相当于没配,此时去启动应用,Spring Boot 会启用默认配置去连接 localhost:9092

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    所以我们需要自定义配置 Kafka,而一旦我们进行了自定义,那么 Spring Boot 则不会启用默认配置

  2. 自定义配置 Kafka

    针对每个 Kafka 源单独配置,配置内容比较固定

    1. FirstKafkaConfig

      /**
       * 第一个Kafka配置
       * @author: 青石路
       */
      @Configuration
      public class FirstKafkaConfig {
      
          @ConfigurationProperties(prefix = "spring.kafka.first")
          @Bean("firstKafkaProperties")
          public KafkaProperties firstKafkaProperties() {
              return new KafkaProperties();
          }
      
          @Bean("firstKafkaTemplate")
          public KafkaTemplate<String, String> firstKafkaTemplate() {
              return new KafkaTemplate<>(firstProducerFactory());
          }
      
          @Bean("firstKafkaListenerContainerFactory")
          public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> fisrtKafkaListenerContainerFactory() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(firstConsumerFactory());
              factory.getContainerProperties().setAckMode(firstKafkaProperties().getListener().getAckMode());
              return factory;
          }
      
          @Bean("firstConsumerFactory")
          public ConsumerFactory<String, String> firstConsumerFactory() {
              return new DefaultKafkaConsumerFactory<>(firstKafkaProperties().buildConsumerProperties());
          }
      
          @Bean("firstProducerFactory")
          public DefaultKafkaProducerFactory<String, String> firstProducerFactory() {
              return new DefaultKafkaProducerFactory<>(firstKafkaProperties().buildProducerProperties());
          }
      }
      
    2. SecondKafkaConfig

      /**
       * 第二个Kafka配置
       * @author: 青石路
       */
      @Configuration
      public class SecondKafkaConfig {
      
          @ConfigurationProperties(prefix = "spring.kafka.second")
          @Bean("secondKafkaProperties")
          public KafkaProperties secondKafkaProperties() {
              return new KafkaProperties();
          }
      
          @Bean("secondKafkaTemplate")
          public KafkaTemplate<String, String> secondKafkaTemplate() {
              return new KafkaTemplate<>(secondProducerFactory());
          }
      
          @Bean("secondKafkaListenerContainerFactory")
          public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> fisrtKafkaListenerContainerFactory() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(secondConsumerFactory());
              return factory;
          }
      
          @Bean("secondConsumerFactory")
          public ConsumerFactory<String, String> secondConsumerFactory() {
              return new DefaultKafkaConsumerFactory<>(secondKafkaProperties().buildConsumerProperties());
          }
      
          @Bean("secondProducerFactory")
          public DefaultKafkaProducerFactory<String, String> secondProducerFactory() {
              return new DefaultKafkaProducerFactory<>(secondKafkaProperties().buildProducerProperties());
          }
      }
      

    重点在

    多源之间不要配重、不要配混

  3. 调整消息监听与消息发送

    因为配置了多源,那么 KafkaListenerContainerFactory 也对应配置了多个,所以我们要指定用哪个 KafkaListenerContainerFactory 来创建消息监听器

    @KafkaListener(topics = "tp_qsl_order_cancel", groupId = "gid_qsl_order_cancel")
    
    // 调整成
    
    @KafkaListener(topics = "tp_qsl_order_cancel", groupId = "gid_qsl_order_cancel", containerFactory = "firstKafkaListenerContainerFactory")
    

    消费消息 端就算调整完成;同理,KafkaTemplate 也配置了多个,那么发送消息的时候也需要指定用哪个 KafkaTemplate 来发送

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    
    // 调整成
    
    private KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    @Qualifier("secondKafkaTemplate")
    public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    

    发送消息 端也就调整完成

都调整完成后,我们启动应用,会发现启动失败,并提示如下信息

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

这特喵的,跟预想的不一样吖

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

遇到问题先不要慌,我们仔细看下提示信息,我给你们翻译一下

所以处理方式就来了,使用 @Primary 来提高某个 KafkaProperties 实例的优先级,KafkaAnnotationDrivenConfiguration 就不会懵圈了,会使用优先级高的 KafkaProperties 实例

我们直接提高 firstKafkaProperties 的优先级

@ConfigurationProperties(prefix = "spring.kafka.first")
@Bean("firstKafkaProperties")
@Primary
public KafkaProperties firstKafkaProperties() {
    return new KafkaProperties();
}

再启动应用,发现正常启动了;你们就可以进行 消费消息发送消息 测试了,我就不演示了,反正我测试都是通过的,不信?不信就不信,你能把我怎么样嘛

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

启停配置化

Kafka 不管是单源还是多源,应用进行集成,都是非常合理的需求,我们开发做对应的实现也是应该的;但如下这个需求我多少是有点抵触的

这里的启停指的是 启用停用;演示的时候,哪些 Kafka 源能正常使用就启用这些,哪些还不能使用就停用哪些,同时业务代码中也需要做适配调整。面对这个需求,你们说是不是不合理?所以你们能理解我的抵触了吧。但为了更好的演示,给甲方爸爸留下专业的印象,增加开关貌似是当下最合适的无奈之选,极不情愿的开启改造之旅

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP
  1. 增加开关配置

    在配置文件中增加开关配置,每个 Kafka 源有其独立的配置,有几个源就配置几个开关

    spring:
      kafka:
        first:
          enabled: true
          ...
        second:
          enabled: true
          ...
    

    enabled 配置成 true 表示启用,false 表示停用

  2. 自定义配置适配开关

    需要根据开关值来决定是否启用 FirstKafkaConfigSecondKafkaConfig,Spring Boot 正好提供了一个具有该功能的注解:ConditionalOnProperty,直接安排上

    /**
     * 第一个Kafka配置
     * @author: 青石路
     */
    @Configuration
    @ConditionalOnProperty(name = "spring.kafka.first.enabled", havingValue = "true")
    public class FirstKafkaConfig {
        ...
    
    /**
     * 第二个Kafka配置
     * @author: 青石路
     */
    @Configuration
    @ConditionalOnProperty(name = "spring.kafka.second.enabled", havingValue = "true")
    public class SecondKafkaConfig {
        ...
    

    这样就实现了通过开关来 启停 Kafka 源

  3. 消费消息与发送消息适配开关

    消费端适配很简单

    /**
     * @author: 青石路
     */
    @Component
    @ConditionalOnProperty(name = "spring.kafka.first.enabled", havingValue = "true")
    public class KafkaConsumer {
        ...
    

    发送端适配则有点不一样,方式有多种,我提供一种;修改 KafkaSender,改 2 处即可

    KafkaTemplate 调整成非强制依赖,将 @Autowiredrequired 设置成 false

    @Autowired
    @Qualifier("secondKafkaTemplate")
    public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    // 调整成
    
    @Autowired(required = false)
    @Qualifier("secondKafkaTemplate")
    public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    

    使用 KafkaTemplate 时做 null 判断

    public void send(String topic, String msg) {
        kafkaTemplate.send(topic, msg).addCallback(
                success -> {
                    if (success != null) {
                        log.info("消息发送成功: Topic={}, Partition={}, Offset={}",
                                success.getRecordMetadata().topic(), success.getRecordMetadata().partition(), success.getRecordMetadata().offset());
                    }
                },
                failure -> {
                    log.error("消息发送失败:", failure.getCause());
                }
        );
    }
    
    // 调整成
    
    public void send(String topic, String msg) {
        if (kafkaTemplate == null) {
            log.warn("未启用secondKafka,不发送消息");
            return;
        }
        kafkaTemplate.send(topic, msg).addCallback(
                success -> {
                    if (success != null) {
                        log.info("消息发送成功: Topic={}, Partition={}, Offset={}",
                                success.getRecordMetadata().topic(), success.getRecordMetadata().partition(), success.getRecordMetadata().offset());
                    }
                },
                failure -> {
                    log.error("消息发送失败:", failure.getCause());
                }
        );
    }
    

至此改造就算完成;开关都为 true 的情况下,效果与未加开关前的多源是一致的,也就是正常的,我已经测过了,你们不放心的话自己再去测试一下;开关都为 false 时,相当于没注册消费监听器,也就相当于没有消费者,那么往 tp_qsl_order_cancel topic 中发消息,是没有消费者消费消息的,那么控制台就不会有任何输出,同理,此时的 KafkaTemplate 是没有注册成功的(也就是 null),发起 http 请求

http://localhost:8080/order/add?orderInfo=大订单

控制台输出如下

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

正是我们想要的效果,说明都为 false 的情况也是正确的;接下来我们看下 false、true 的情况

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

好家伙,直接启动失败!但这个问题我们前面碰到过,那么如何处理呢?用 @Primary 标记 secondKafkaProperties ?假设我们这么做了,那开关都为 true 的情况下,KafkaProperties 实例岂不是有多个 Primary,Spring Boot 又会懵圈,不知道该使用哪个 KafkaProperties 实例,显然这种方式行不通;我们把问题拓展下,多个 KafkaProperties 实例存在的情况下,需要动态指定一个 Primary,但不能是 Spring Boot 自动配置的那个,即

除了这个,随便给哪个 KafkaProperties 实例指定成 Primary 都是没问题的,因为我们的业务代码中都明确指定了使用的是我们自定义的 kafka,所以我们需要在 Bean 实例化之前修改某个 KafkaProperties 的 BeanDefinition,设置其 Primary 为 true;实现方式有很多,我这里提供一种:BeanFactoryPostProcessor

/**
 * @author: 青石路
 */
@Component
public class KafkaPrimaryProcessor implements BeanFactoryPostProcessor {

    private static final Logger log = LoggerFactory.getLogger(KafkaPrimaryProcessor.class);

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        String[] beanNames = beanFactory.getBeanNamesForType(KafkaProperties.class);
        if (beanNames.length <= 1) {
            return;
        }
        for (String beanName : beanNames) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
            // springboot的自动配置
            if (beanName.contains(KafkaProperties.class.getName())) {
                continue;
            }
            log.info("多KafkaProperties,指定primary[{}]", beanName);
            beanDefinition.setPrimary(true);
            return;
        }
    }
}

这个代码相信你们都能看懂,会从多个 KafkaProperties BeanDefinition 中取第一个(除了自动配置的),设置其 Primary 为 true,所以我们还需要调整下 firstKafkaProperties,拿掉其 @Primary

@ConfigurationProperties(prefix = "spring.kafka.first")
@Bean("firstKafkaProperties")
@Primary
public KafkaProperties firstKafkaProperties() {
    return new KafkaProperties();
}

// 调整成

@ConfigurationProperties(prefix = "spring.kafka.first")
@Bean("firstKafkaProperties")
public KafkaProperties firstKafkaProperties() {
    return new KafkaProperties();
}

这么调整之后,无论是有几个 Kafka 源,以及如何启停这些源,都能正常运转,是不是很优秀,值得鼓掌!

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

话说,需求至此已经算完美实现了,可以完结了,但作为一个开发,尤其是一个有追求的开发,还有一个疑点未得到解决,心里始终不舒坦,是什么疑点呢,我们继续往下看

排除自动配置

既然是我们自定义配置 Kafka,不再依赖 Spring Boot 的自动配置,我们是不是可以排除掉 Spring Boot 的 Kafka 自动配置?理论上来说是可行的,那就干呗;直接排除掉 KafkaAutoConfiguration.class

/**
 * @author: 青石路
 */
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
public class KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }
}

既然排除了自动配置,那么也就不需要指定 KafkaProperties 的 Primary 了,KafkaPrimaryProcessor 直接删掉,其他不用调整;将开关都设置成 true,我们启动应用后测试下

  1. 发送消息

    直接 http 请求

    http://localhost:8080/order/add?orderInfo=排除自动配置
    

    日志显示发送成功

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    我们在看下 Topic tp_qsl_inventory_order_add 中消息

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    发送消息是没问题的

  2. 消费消息

    往 Topic tp_qsl_order_cancel 中发送消息

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    点击 发送消息 后,发现控制台并没有任何输出!!!

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    先别慌,我们冷静分析下,控制台没有任何输出说明消费者没注册成功,也就是 @KafkaListener 没生效,为什么没生效,肯定是没有被解析,谁解析它呢,KafkaListener 中应该有说明

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    已经描述的很清楚

    EnableKafka 方便点,我们使用它

    /**
     * @author: 青石路
     */
    @SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
    @EnableKafka
    public class KafkaApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class, args);
        }
    }
    

    重新启动应用,会发现控制台有如下输出

    SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

    消费消息也正常了;因为重启应用了,保险起见,发送消息最好再测一次,记得测!!!

至此,心中疑点得以解决,如此才算完美解决!

SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩-LMLPHP

总结

  1. Kafka 多源实现,大家需要掌握,至于启停配置化,大家就当看个热闹

    但是启停配置化的实现(@ConditionalOnProperty),还是值得大家掌握的

    Spring Boot 的条件注解非常多,在 Spring Boot 内部被广泛使用,感兴趣的可以查看:spring-boot-2.0.3源码篇 - @Configuration、Condition与@Conditional

  2. 如果不使用Spring Boot的自动配置,建议把对应的自动配置类排除掉

    自动配置手动配置 同时存在的话可能会产生冲突,就像文中的 KafkaProperties 多实例;直接排除可能会导致缺少某些功能,肯定是没有启用这些功能的依赖,细心去寻找依赖并启用即可

  3. 完整代码:spring-boot-kafka

12-24 14:42