前言

       这次跟大家分享kafka消费的另一种接入实现。其实原因是因为目前这个项目的框架太老了,springboot还是1.5的,直接用注解@KafkaListener无法消费的问题。我也不想调这个框架,没工时不说,万一再整出兼容性问题,那问题就大了,而且现在时间太赶了。


一、目标场景

  1. 目前是物联网设备的流水上报后,会存ES,同时经过物模型解析后,会往下游kafka推送信息。
  2. 下游系统接收kafka的设备流水,进行流水解析,解析成业务数据,做业务融合。

二、使用步骤

1.引入库

代码如下(示例):

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<exclusions>
		<exclusion>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<!-- spring-kafka内部依赖kafka-clients升级补偿 -->
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.4.0</version>
</dependency>

       其实上面看起来说的是排除springboot的kafka-clients,引入自定义的kafka-clients做为升级补偿。
       编译、运行都不报错,但是使用@KafkaListener注解消费kafka信息,会报错,大致意思就是springframe版本低。应该就是低版本springboot的依赖springframe与高版本kafka-client依赖的springframe不匹配导致。
       没有去调整框架,具体就不发散了。


import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

/**
 * 消费者listener
 *
 * @author zhengwen
 **/
@Slf4j
@Component
//@Lazy
public class KafkaListenConsumer {

    @Resource
    private DataTransService dataTransService;

    /**
     * 设备流水listenner
     *
     * @param records 消费信息
     * @param ack     Ack机制
     */
    @KafkaListener(topics = "${easylinkin.analyze.device.flow.topic.consumer}")
    public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {
        log.debug("=====设备流水deviceFlowListen消费者接收信息====");
        try {

            for (ConsumerRecord record : records) {
                log.debug("---开启线程解析设备流水数据:{}", record.toString());
                dataTransService.deviceFlowTransSave(record);
            }

        } catch (Exception e) {
            log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
        } finally {
            //手动提交偏移量
            ack.acknowledge();
        }
    }
}

       上面就是我最初直接使用注解写的消费方法。

2.主动启动消费


import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;

@Slf4j
public class DeviceFlowConsumerServerStart {
    
    @Resource
    private DataTransService dataTransService;

    @Value("${easylinkin.analyze.device.flow.topic.consumer}")
    private String topic;

    @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
    private String kafkaServiceUrl;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;


    @PostConstruct
    void start() {
        log.info("设备流水消费kafka服务启动!");

        //配置信息
        Properties props = new Properties();
        //先自定义的设置下,再用配置里的覆盖
        //声明kafka的地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceUrl);
        //每个消费者分配独立的消费者组编号
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //如果value合法,则自动提交偏移量
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        //设置多久一次更新被消费消息的偏移量
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        //自动重置offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerParams);
        //订阅消费topic
        kafkaConsumer.subscribe(Arrays.asList(topic));
        startConsumer(kafkaConsumer);

        log.info("设备流水消费kafka服务启动完成!");
    }

    private void startConsumer(KafkaConsumer<String, String> kafkaConsumer) {
        new Thread(()->{
            while (true){
                try {
                    ConsumerRecords<String,String> poll = kafkaConsumer.poll(2000);
                    Iterable<ConsumerRecord<String,String>> records = poll.records(topic);
                    Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
                    while (iterator.hasNext()){
                        dataTransService.deviceFlowTransSave(iterator.next());
                    }
                }catch (Exception e){
                    log.error("消费失败",e);
                    startConsumer(kafkaConsumer);
                    break;
                }
            }
        }).start();
    }

}

       这里设置订阅后,启用线程消费,希望是消费异常不要把这里主线程搞挂了。因为我这里消费信息,会用一个dataTransService做设备流水的进一步解析,做业务融合,可能就涉及到事物嵌套的问题。


总结

  1. 针对老项目的另一种kafka消费接入方式
  2. 老springboot是真狗,各种接入不丝滑
  3. 就写到这里,希望能帮到大家,uping!
03-28 05:08