如何在Kafka上创建topic?
手工脚本创建
./kafka-topics.sh –zookeeper 127.0.0.1:2181 –create –topic test.example –replication-factor 2 –partitions 12
- -topic制定topic的name
- –partitions指定分区数,这个参数要根据broker数和数据量决定,一般情况下每个Broker上两个分区最好
- –replication-factor指定partition的replicas数,建议设置为2
自动创建
开启自动创建配置:auto.create.topics.enable=true ,使用程序直接往kafka中相应的topic发送数据,如果topic不存在就会按默认配置进行创建。
如何在Kafka上对一个Topic增加partition?
通过kafka-topics.sh工具的alter命令,将test.example的partitions从12增加到20
./kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic test.example
如何在Kafka上对一个Topic增加replicas?
操作步骤日下:
- 查看topic的详细信息
./kafka-topics.sh –zookeeper 127.0.0.1:2181 –describe –topic test.example
- 修改配置文件
创建json文件partitions-to-move.json ,修改内容如下:
{
“partitions”:
[
{
“topic”: “test.example”,
“partition”: 0,
“replicas”: [0,4]
}
,version”:1
}
- 执行一下脚本
kafka-reassign-partitions.sh –zookeeper 127.0.0.1:2181 –reassignment-json-file partitions-to-move.json –execute
- 检查修改情况
kafka-topics.sh –zookeeper 127.0.0.1:2181 –describe –topic test.example
如何在Kafka中对Topic的leader进行均衡?
在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也j均分在不同的broker上。每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"中的第一个replicas叫"preferred replica",刚创建的topic一般"preferred replica"是leader。leader replica负责所有的读写。但随着时间推移,broker可能会停机,会导致leader迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader
- 对所有topic进行操作
./kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181
- 对特定的topic操作
编写json文件
{
"partitions":
[
{"topic":"test.example","partition": "0"}
]
}
执行一下脚本
./kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181 --path-to-json-file *.json
Kafka下线broker的操作
主动下线是指broker运行正常,因为机器需要运维(升级操作系统,添加磁盘等)而主动停止broker,分两种情况处理:
所有的topic的replica >= 2
此时,直接停止一个broker,会自动触发leader election操作,不过目前leader election是逐个partition进行,等待所有partition完成leader election耗时较长,这样不可服务的时间就比较长。为了缩短不可服务时间窗口,可以主动触发停止broker操作,这样可以逐个partition转移,直到所有partition完成转移,再停止broker。
./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60
shutdown Broker
./kafka-server-stop.sh
存在topic的replica=1
当存在topic的副本数小于2,只能手工把当前broker上这些topic对应的partition转移到其他broker上。当此broker上剩余的topic的replica > 2时,参照上面的处理方法继续处理