1 pom文件配置
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.0.RC1</version>
</dependency>
2 consumer配置
2.1 xml配置文件
建立kafka-consumer.xml文件配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
default-autowire="byName">
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/>
<entry key="group.id" value="data_center_group"/>
<!--消费端口收不到消息-->
<!--spring-kafka-consumer.xml的auto-startup设置为true-->
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="5000"/>
<entry key="session.timeout.ms" value="30000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 实际执行消息消费的类 -->
<bean id="messageListenerConsumerService" class="com.hisense.fusion.kafka.consumer.KafkaConsumerServer"/>
<!-- 聚好看消费者容器配置信息 -->
<bean id="mediaContainerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="${kafka.topic.media}"/>
<property name="messageListener" ref="messageListenerConsumerService"/>
</bean>
<!-- mediaMessageListenerContainer bean,使用的时候,只需要注入这个bean -->
<bean id="mediaMessageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="mediaContainerProperties"/>
</bean>
</beans>
2.2 监听器配置
建立监听文件,实现messageListener接口
package com.hisense.fusion.kafka.consumer;
import com.hisense.fusion.kafka.parse.IncDataParse;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.MessageListener;
/**
* kafka监听器启动
* 自动监听是否有消息需要消费
* @author bill
*
*/
public class KafkaConsumerServer implements MessageListener<String, String> {
protected final Logger LOG = LoggerFactory.getLogger("kafkaConsumer");
/**
* 监听器自动执行该方法
* 消费消息
* 自动提交offset
* 执行业务代码
* (high level api 不提供offset管理,不能指定offset进行消费)
*/
public void onMessage(ConsumerRecord<String, String> record) {
String topic = record.topic();
String value = record.value();
long offset = record.offset();
int partition = record.partition();
LOG.info("topic:{}-partition:{}-offset:{}",topic,partition,offset);
}
}
3 producer配置
3.1 kafka配置
创建kafka-producer.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
default-autowire="byName">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/>
<entry key="retries" value="3" />
<entry key="retry.backoff.ms" value="10000" />
<entry key="request.timeout.ms" value="10000" />
<entry key="acks" value="1"/>
<!--<entry key="batch.size" value="5242880" />-->
<!--<entry key="compression.type" value="gzip"/>-->
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.springframework.kafka.support.serializer.JsonSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<!--<bean id="producerListener" class="com.hisense.fusion.kafka.producer.KafkaProducerListener" />-->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="defaultTopic" />
</bean>
</beans>
3.2 消息发送
package com.hisense.fusion.kafka.producer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.hisense.fusion.kafka.constants.KafkaMesConstant;
import com.hisense.fusion.kafka.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* kafkaProducer模板
* 使用此模板发送消息
*
* @author bill
*/
@Service
public class KafkaProducerServer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* The logger.
*/
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerServer.class);
/**
* kafka发送消息模板
*
* @param topic 主题
* @param values messageValue
*/
public Boolean sendMessageList(String topic, String schema, List<JSONObject> values, String key, String ifPartition, Integer partitionNum) {
boolean rs = true;
Long startTime = DateUtil.getMillisTimeStamp();
ListenableFuture<SendResult<String, Object>> result = null;
int partitionIndex;
String value;
boolean send;
int size = 0;
String kafkaKey = null;
JSONObject data = new JSONObject();
if (values != null && values.size() > 0) {
size = values.size();
for (JSONObject object : values) {
if (object.get(key) instanceof Integer) {
kafkaKey = ((Integer) object.get(key)).toString();
}
if (object.get(key) instanceof Long) {
kafkaKey = ((Long) object.get(key)).toString();
}
if (object.get(key) instanceof String) {
kafkaKey = ((String) object.get(key)).toString();
}
if(kafkaKey==null){
logger.warn("primary key is null,topic:{},key:{},data:{}",topic,key,JSON.toJSONString(object));
continue;
}
try {
data.put("schema", generateSchema(topic,object));
data.put("payload", object);
if (ifPartition.equals("0")) {
//表示使用分区
partitionIndex = getPartitionIndex(kafkaKey, partitionNum);
// result = kafkaTemplate.send(topic, partitionIndex, kafkaKey, object);
result = kafkaTemplate.send(topic, partitionIndex, kafkaKey, data);
} else {
result = kafkaTemplate.send(topic, kafkaKey, data);
}
send = checkResult(result);
}catch (JSONException e){
logger.error("{} schema define is error:{}",topic,e.getMessage());
send = false;
}catch (Exception e){
logger.error("{} send kafka failed :{}",topic,e.getMessage());
send = false;
}
if (!send) {
rs = false;
break;
}
}
}
Long endTime = DateUtil.getMillisTimeStamp();
if (rs) {
logger.info("topic:{},Send Result: true,count : {},takes :{} ms", topic, size, endTime - startTime);
} else {
logger.error("topic:{},Send Result: false,count : {}", topic, size);
}
return rs;
}
/**
* 检查发送返回结果record
*
* @param res
* @return
*/
private boolean checkResult(ListenableFuture<SendResult<String, Object>> res) {
if (res != null) {
try {
SendResult r = res.get();//检查result结果集
/*检查recordMetadata的offset数据,不检查producerRecord*/
Long offsetIndex = r.getRecordMetadata().offset();
if (offsetIndex != null && offsetIndex >= 0) {
return true;
} else {
return false;
}
} catch (InterruptedException e) {
logger.error("kafka exception :{}", e.getMessage());
return false;
} catch (ExecutionException e) {
logger.error("kafka exception :{}", e.getMessage());
return false;
}
} else {
return false;
}
}
/**
* 根据key值获取分区索引
*
* @param key
* @param partitionNum
* @return
*/
private int getPartitionIndex(String key, int partitionNum) {
if (key == null) {
Random random = new Random();
return random.nextInt(partitionNum);
} else {
int result = key.hashCode();
if (result != Integer.MIN_VALUE) {
result = Math.abs(key.hashCode()) % partitionNum;
} else {
result = key.hashCode() % partitionNum;
}
return result;
}
}
}
3.3 消息发送监听
package com.hisense.fusion.kafka.producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
/**
* kafkaProducer监听器,在producer配置文件中开启
* @author bill
*
*/
@SuppressWarnings("rawtypes")
public class KafkaProducerListener implements ProducerListener {
protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer");
/**
* 发送消息成功后调用
*/
public void onSuccess(String topic, Integer partition, Object key,
Object value, RecordMetadata recordMetadata) {
LOG.info("==========kafka发送数据成功(日志开始)==========");
LOG.info("----------topic:"+topic);
LOG.info("----------partition:"+partition);
LOG.info("----------key:"+key);
// LOG.info("----------value:"+value);
LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");
}
/**
* 发送消息错误后调用
*/
public void onError(String topic, Integer partition, Object key,
Object value, Exception exception) {
LOG.info("==========kafka发送数据错误(日志开始)==========");
LOG.info("----------topic:"+topic);
LOG.info("----------partition:"+partition);
LOG.info("----------key:"+key);
LOG.info("----------value:"+value);
LOG.info("----------Exception:"+exception);
LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
exception.printStackTrace();
}
/**
* 方法返回值代表是否启动kafkaProducer监听器
*/
public boolean isInterestedInSuccess() {
// LOG.info("///kafkaProducer监听器启动///");
return true;
}
}