前言
这次跟大家分享kafka消费的另一种接入实现。其实原因是因为目前这个项目的框架太老了,springboot还是1.5的,直接用注解@KafkaListener无法消费的问题。我也不想调这个框架,没工时不说,万一再整出兼容性问题,那问题就大了,而且现在时间太赶了。
一、目标场景
- 目前是物联网设备的流水上报后,会存ES,同时经过物模型解析后,会往下游kafka推送信息。
- 下游系统接收kafka的设备流水,进行流水解析,解析成业务数据,做业务融合。
二、使用步骤
1.引入库
代码如下(示例):
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- spring-kafka内部依赖kafka-clients升级补偿 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
其实上面看起来说的是排除springboot的kafka-clients,引入自定义的kafka-clients做为升级补偿。
编译、运行都不报错,但是使用@KafkaListener注解消费kafka信息,会报错,大致意思就是springframe版本低。应该就是低版本springboot的依赖springframe与高版本kafka-client依赖的springframe不匹配导致。
没有去调整框架,具体就不发散了。
import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* 消费者listener
*
* @author zhengwen
**/
@Slf4j
@Component
//@Lazy
public class KafkaListenConsumer {
@Resource
private DataTransService dataTransService;
/**
* 设备流水listenner
*
* @param records 消费信息
* @param ack Ack机制
*/
@KafkaListener(topics = "${easylinkin.analyze.device.flow.topic.consumer}")
public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {
log.debug("=====设备流水deviceFlowListen消费者接收信息====");
try {
for (ConsumerRecord record : records) {
log.debug("---开启线程解析设备流水数据:{}", record.toString());
dataTransService.deviceFlowTransSave(record);
}
} catch (Exception e) {
log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
} finally {
//手动提交偏移量
ack.acknowledge();
}
}
}
上面就是我最初直接使用注解写的消费方法。
2.主动启动消费
import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
@Slf4j
public class DeviceFlowConsumerServerStart {
@Resource
private DataTransService dataTransService;
@Value("${easylinkin.analyze.device.flow.topic.consumer}")
private String topic;
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")
private String kafkaServiceUrl;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@PostConstruct
void start() {
log.info("设备流水消费kafka服务启动!");
//配置信息
Properties props = new Properties();
//先自定义的设置下,再用配置里的覆盖
//声明kafka的地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceUrl);
//每个消费者分配独立的消费者组编号
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//如果value合法,则自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//设置多久一次更新被消费消息的偏移量
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
//自动重置offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerParams);
//订阅消费topic
kafkaConsumer.subscribe(Arrays.asList(topic));
startConsumer(kafkaConsumer);
log.info("设备流水消费kafka服务启动完成!");
}
private void startConsumer(KafkaConsumer<String, String> kafkaConsumer) {
new Thread(()->{
while (true){
try {
ConsumerRecords<String,String> poll = kafkaConsumer.poll(2000);
Iterable<ConsumerRecord<String,String>> records = poll.records(topic);
Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
while (iterator.hasNext()){
dataTransService.deviceFlowTransSave(iterator.next());
}
}catch (Exception e){
log.error("消费失败",e);
startConsumer(kafkaConsumer);
break;
}
}
}).start();
}
}
这里设置订阅后,启用线程消费,希望是消费异常不要把这里主线程搞挂了。因为我这里消费信息,会用一个dataTransService做设备流水的进一步解析,做业务融合,可能就涉及到事物嵌套的问题。
总结
- 针对老项目的另一种kafka消费接入方式
- 老springboot是真狗,各种接入不丝滑
- 就写到这里,希望能帮到大家,uping!