如何在Kafka上创建topic?

手工脚本创建

./kafka-topics.sh –zookeeper 127.0.0.1:2181 –create –topic test.example –replication-factor 2 –partitions 12
  • -topic制定topic的name
  • –partitions指定分区数,这个参数要根据broker数和数据量决定,一般情况下每个Broker上两个分区最好
  • –replication-factor指定partition的replicas数,建议设置为2

自动创建

开启自动创建配置:auto.create.topics.enable=true ,使用程序直接往kafka中相应的topic发送数据,如果topic不存在就会按默认配置进行创建。

如何在Kafka上对一个Topic增加partition?

通过kafka-topics.sh工具的alter命令,将test.example的partitions从12增加到20

./kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic test.example

如何在Kafka上对一个Topic增加replicas?

操作步骤日下:

  • 查看topic的详细信息
./kafka-topics.sh –zookeeper 127.0.0.1:2181 –describe –topic test.example
  • 修改配置文件

    创建json文件partitions-to-move.json ,修改内容如下:
{
“partitions”:
[
{
“topic”: “test.example”,
“partition”: 0,
“replicas”: [0,4]
}
,version”:1
}
  • 执行一下脚本
kafka-reassign-partitions.sh –zookeeper 127.0.0.1:2181 –reassignment-json-file partitions-to-move.json –execute
  • 检查修改情况
kafka-topics.sh –zookeeper 127.0.0.1:2181 –describe –topic test.example

如何在Kafka中对Topic的leader进行均衡?

在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也j均分在不同的broker上。每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"中的第一个replicas叫"preferred replica",刚创建的topic一般"preferred replica"是leader。leader replica负责所有的读写。但随着时间推移,broker可能会停机,会导致leader迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader

  • 对所有topic进行操作
./kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181
  • 对特定的topic操作

编写json文件

{
"partitions":
[
{"topic":"test.example","partition": "0"}
]
}

执行一下脚本

./kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181 --path-to-json-file *.json

Kafka下线broker的操作

主动下线是指broker运行正常,因为机器需要运维(升级操作系统,添加磁盘等)而主动停止broker,分两种情况处理:

所有的topic的replica >= 2

此时,直接停止一个broker,会自动触发leader election操作,不过目前leader election是逐个partition进行,等待所有partition完成leader election耗时较长,这样不可服务的时间就比较长。为了缩短不可服务时间窗口,可以主动触发停止broker操作,这样可以逐个partition转移,直到所有partition完成转移,再停止broker。

./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60

shutdown Broker

./kafka-server-stop.sh

存在topic的replica=1

当存在topic的副本数小于2,只能手工把当前broker上这些topic对应的partition转移到其他broker上。当此broker上剩余的topic的replica > 2时,参照上面的处理方法继续处理

04-11 13:34