一、添加依赖项
compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'
二、发消息(生产者)
2.1 xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--kafka的服务地址,多个地址用英文逗号连接-->
<entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/>
<entry key="group.id" value="0"/>
<entry key="retries" value="10"/>
<entry key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/>
<entry key="buffer.memory" value="33554432"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<!--topic名字-->
<property name="defaultTopic" value="dc-monitor"/>
</bean> </beans>
2.2 发送代码示例
@Test
public void send() throws InterruptedException, ExecutionException, TimeoutException {
KafkaTemplate template = context.getBean(KafkaTemplate.class);
String msg = "中华人民共和国万岁!";
ListenableFuture<SendResult<String, String>> future = template.sendDefault(msg);
SendResult<String, String> result = future.get(10, TimeUnit.SECONDS);
System.out.println("发送成功=====>" + msg);
}
三、收消息(消费者)
3.1 xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--kafka的服务地址,多个地址用英文逗号连接-->
<entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/>
<entry key="group.id" value="0"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg ref="consumerProperties"/>
</bean> <!-- 实际执行消息消费的类 -->
<bean id="kafkaConsumer" class="com.cnblogs.yjmyzz.consumer.DemoKafkaConsumer"/> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<!--topic名字-->
<constructor-arg value="dc-monitor"/>
<property name="messageListener" ref="kafkaConsumer"/>
</bean> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean> </beans>
3.2 接收代码示例
public class DemoKafkaConsumer implements MessageListener<String, String> { @Override
public void onMessage(ConsumerRecord<String, String> data) {
System.out.println("收到消息=====>" + data.value());
}
}