<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<!-- 指定消费组名 -->
<entry key="group.id" value="friend-group"/>
<entry key="enable.auto.commit" value="false"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="max.poll.records" value="1"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
<!--<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>-->
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean> <!-- 消费消息的服务类 -->
<bean id="messageListernerConsumerService" class="com.zhaopin.consumer.ConsumerService"/> <!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="friend"/>
<!--<constructor-arg>
<list>
<value>zptopic</value>
<value>ssmk</value>
<value>friend</value>
</list>
</constructor-arg>-->
<property name="messageListener" ref="messageListernerConsumerService"/> <!-- 设置如何提交offset -->
<property name="ackMode" value="MANUAL_IMMEDIATE"/>
</bean> <!-- 单线程消息监听容器 -->
<!--<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>--> <!-- 多线程消息监听容器 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
<property name="concurrency" value="5"/>
</bean> </beans>

消费者监听类实现AcknowledgingMessageListener这个监听器,可以实现手动提交offset:

package com.zhaopin.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.zhaopin.consumer.dto.FriendRelationDto;
import com.zhaopin.consumer.dto.MessageDto;
import com.zhaopin.consumer.service.FriendRelationService;
import com.zhaopin.pojo.TbPerson;
import com.zhaopin.service.PersonService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; /**
* Created by SYJ on 2017/3/21.
*/
@Service
public class ConsumerService implements AcknowledgingMessageListener<Integer, String> { private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
private static List<TbPerson> personList = new ArrayList<TbPerson>();
private static final Integer INSERT_BATCH_COUNT = 50; @Autowired
private PersonService personService; @Autowired
private FriendRelationService friendRelationService; /**
* 消息监听方法
* @param record
*/
/*@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
logger.info("Before receiving:" + record.toString());
String value = record.value();
MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
try {
friendRelationService.process(message.getData());
} catch (IOException e) {
e.printStackTrace();
}
//insert(record);
//insertBatch(record);
}*/ /**
* 单个TbPerson入库
* @param record
*/
public void insert(ConsumerRecord<Integer, String> record){
String value = record.value();
TbPerson person = JSON.parseObject(value, TbPerson.class);
personService.insert(person);
System.out.println("Single data writing to the database:" + record);
} /**
* 批量TbPerson入库
* @param record
*/
public void insertBatch(ConsumerRecord<Integer, String> record){
String value = record.value();
TbPerson person = JSON.parseObject(value, TbPerson.class);
personList.add(person);
if (personList.size() == INSERT_BATCH_COUNT) {
personService.insertBatch(personList);
System.out.println("Batch data writing to the database:" + personList);
personList.clear();
}
} @Override
public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
logger.info("Before receiving:" + record.toString());
String value = record.value();
MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
try {
friendRelationService.process(message.getData());
acknowledgment.acknowledge();//提交offset
} catch (IOException e) {
e.printStackTrace();
}
//insert(record);
//insertBatch(record);
}
}
04-17 01:37