环境:腾讯云centos7

1、下载

http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz

2、解压

tar -xvf kafka_2.11-2.3.0.tgz
mv kafka_2.11-2.3.0 /usr/java/kafka2.11
cd /usr/java/kafka2.11

3、启动与测试

(a)zookeeper启动
bin/zookeeper-server-start.sh config/zookeeper.properties
(b)kafka服务端启动
bin/kafka-server-start.sh config/server.properties
(c)列出topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
(d)创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Demo1
(e)描述Topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Demo1
(f)发布消息到指定的Topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo1
(g)消费指定Topic上的消息
(已过时,老版本使用,否则报zookeeper is not a recognized option)
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic Demo1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Demo1 --from-beginning

4、安装kafka web界面

  a)下载地址:https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar

  b) 运行

  mkdir /mydata/kafkamonitorlogs

    java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk 132.232.44.82:2181 \
--port 8787 \
--refresh 10.seconds \
--retain 7.days 1>/mydata/kafkamonitorlogs/stdout.log 2>/mydata/kafkamonitorlogs/stderr.log &

  c) web访问  

http://ip:8787

本人虚拟机内存太小了,所以无法查看到消息列表,但是web界面确实可以用!

完毕!

########springboot集成实践###########

1、pom.xml添加依赖

   <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

2、yml文件添加配置

spring:
profiles:
active: @activatedProperties@
kafka:
bootstrap-servers: 132.232.44.82:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、在Kafka的config/server.properties文件中添加

advertised.listeners=PLAINTEXT://132.232.44.89:9092

4、KafkaConsumer.java

package com.cn.commodity.controller;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; /**
* kafka消费者测试
*/
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test_topic1")
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

5、KafkaProducer.java

package com.cn.commodity.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; /**
* 测试kafka生产者
*/
@RestController
@RequestMapping("kafka")
public class KafkaProducer { @Autowired
private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("send")
public String send(String msg){
kafkaTemplate.send("test_topic1", msg);
return "success";
} }

启动运行,完毕!

05-28 22:00