1、创建集群
http://kafka.apache.org/documentation/#quickstart
有一句我觉得特别重要: For Kafka, a single broker is just a cluster of size one.
1.1、命令行操作
#解压文件 tar -zxf kafka_2.11-1.1.0.tgz cd kafka_2.11-1.1.0 #启动Zookeerper bin/zookeeper-server-start.sh config/zookeeper.properties #启动Kafka bin/kafka-server-start.sh config/server.properties & cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties & #创建集群 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic myTopic #查看主题 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic myTopic
1.2、图形化界面操作
除了命令行以为,也可以通过kafka-manager查看
2、Spring Boot集成Kafka
2.1、引入Maven依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2.2、配置
spring: kafka: bootstrap-servers: 10.123.52.76:9092,10.123.52.76:9093,10.123.52.76:9094 consumer: group-id: myGroup
2.3、收发消息
package com.cjs.boot.message; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Slf4j @Component public class MyListener { @KafkaListener(topics = "myTopic") public void processMessage2(String content) { log.info("【Received Message From 'myTopic'】: {}", content); } }
package com.cjs.boot.controller; import com.cjs.boot.response.RespResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Controller; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.servlet.ModelAndView; @Controller @RequestMapping("/message") public class MessageController extends BaseController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/add.html") public ModelAndView add() { return new ModelAndView("message/add"); } @PostMapping("/send.json") @ResponseBody public RespResult send(String text) { ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("myTopic", String.valueOf(System.currentTimeMillis()), text); return RespResult.success(); } }
2018-05-04 12:36:59.736 INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-05-04 12:36:59.736 INFO 7552 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e2018-05-04 12:36:59.830 INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener : 【Received Message From 'myTopic'】: 大家好啊 2018-05-04 12:37:24.107 INFO 7552 --- [ntainer#0-0-C-1] com.cjs.boot.message.MyListener : 【Received Message From 'myTopic'】: 吃饭啦