2.1安装部署
2.1.1 集群规划
本例使用单机集群方式,在本地启用三份程序组成集群, 使用一个Zookeeper集群。
需要
1.安装JDK8 Deepin 安装jdk1.8
2.配置Zookeeper集群,参见 Zookeeper单机为伪集群
2.1.2 jar包下载
地址: http://kafka.apache.org/downloads.html
2.1.3 集群部署
-
解压安装包
tar -zxvf kafka_2.12-2.7.0.tgz -C /usr/local/kafka/
2. 修改解压后的文件名称
/usr/local/kafka/kafka01
3. 创建logs文件夹
4. 需要修改的配置文件内容:
broker.id=0 #broker的id 每个节点一个不能重复,本例中三个节点分别为0,1,2
delete.topic.enable=true #开启删除topic功能##
host.name=127.0.0.1 #本机名或者ip
port=9091 #默认为9092 每个id内Kakfa服务器监听的端口
log.dirs=/usr/local/kafka/kafka01/logs #日志存放路径
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 #zookeeper连接端口
5. 复制 /kafka01 到 kafka02,kafka03
6.修改kafka02,kafka03对应的配置
- kafka02/config/server.properties
broker.id=1 #broker的id 每个节点一个不能重复,本例中三个节点分别为0,1,2
delete.topic.enable=true #开启删除topic功能##
host.name=127.0.0.1 #本机名或者ip
port=9092 #默认为9092 每个id内Kakfa服务器监听的端口
log.dirs=/usr/local/kafka/kafka02/logs #日志存放路径
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 #zookeeper连接端口
- kafka03/config/server.properties
broker.id=2 #broker的id 每个节点一个不能重复,本例中三个节点分别为0,1,2
delete.topic.enable=true #开启删除topic功能##
host.name=127.0.0.1 #本机名或者ip
port=9093 #默认为9092 每个id内Kakfa服务器监听的端口
log.dirs=/usr/local/kafka/kafka03/logs #日志存放路径
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 #zookeeper连接端口
7.启动集群
先启动zookeeper集群
/usr/local/zookeeper/zookeeper01/bin/zkServer.sh start
/usr/local/zookeeper/zookeeper02/bin/zkServer.sh start
/usr/local/zookeeper/zookeeper03/bin/zkServer.sh start
再启动kafka集群
root@bluejay-PC:/# /usr/local/kafka/kafka01/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka01/config/server.properties
root@bluejay-PC:/# /usr/local/kafka/kafka02/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka02/config/server.properties
root@bluejay-PC:/# /usr/local/kafka/kafka03/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka03/config/server.properties
8. 关闭集群
root@bluejay-PC:/# /usr/local/kafka/kafka01/bin/kafka-server-stop.sh
root@bluejay-PC:/# /usr/local/kafka/kafka02/bin/kafka-server-stop.sh
root@bluejay-PC:/# /usr/local/kafka/kafka03/bin/kafka-server-stop.sh
9.kafka群起脚本
2.2 Kafka命令行操作
1. 创建topic
root@bluejay-PC:/# /usr/local/kafka/kafka03/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --replication-factor 1 --partitions 1 --topic first
Created topic first.
2. 查看当前服务器中的所有 topic
root@bluejay-PC:/# /usr/local/kafka/kafka03/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
first
3. 消费消息-开启监听
root@bluejay-PC:/# /usr/local/kafka/kafka01/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9091 --topic first
4. 发送消息
root@bluejay-PC:/# /usr/local/kafka/kafka03/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9091 --topic first
>hello world
>atguigu atguigu
>zhanghl^H^H^H
>end
>
>^C
5. 查看某个Topic的详情
root@bluejay-PC:/# /usr/local/kafka/kafka01/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic first
Topic: first PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: first Partition: 0 Leader: 2 Replicas: 2 Isr: 2
6. 修改分区数
root@bluejay-PC:/# /usr/local/kafka/kafka01/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic first --partitions 6
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
7. 删除topic
root@bluejay-PC:/# /usr/local/kafka/kafka01/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic first