一. kafka入门
1.1 kafka副本
kafka副本(replication)包含两种类型:leader副本,follower副本
leader副本负责读写请求,follower副本负责同步leader副本消息,通过副本选举实现故障转移。
1.2 kafka分区
虽然有了副本机制可以保证数据的持久化或消息不丢失,但没有解决...极客时间版权所有: https://time.geekbang.org/column/article/99318
1.3 kafka持久化
kafka使用消息日志(log)来保存数据,一个日志就是磁盘上一个只能追加写消息的物理文件。因为只能追加写入,故避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作,这也实现了kafka高吞吐特性的一个重要手段。另外kafka还使用了日志段机制,在kafka底层,一个日志又进一步细分为多个日志段,消息被追加写到当前最新的日志段中,当写满一个日志后,kafka会自动切分出一个新的日志段,并将老的日志段封存起来。kafka在后台还有定时任务会定期检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。
笔记:
- kafka副本机制使用的是异步消息拉取,因此leader和follower之间存在不一致性,且消息从leader副本同步到follwer副本需要一定的时间
- 使用单分区保证 kafka 是按照顺序消费的
1.4 kafka分类
- Apache kafka,也称社区版kafka。优势在于迭代速度快,社区响应度高,使用它可以有更高的把控度;缺陷在于仅提供基础核心组件,缺失一些高级特性。
- Confluent kafka,Confluent公司提供的kafka。优势在于集成了很多高级特性且由kafka原班人马打造,质量有保证;缺陷在于相关文档不全,普及率较低,没有太多可供参考的范例。
- CDH/HDP kafka,大数据公司提供的kafka,内嵌Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,严禁速度较慢。
1.5 kafka 最重要的配置
1.5.1 Broker 端参数
- log.dirs: 指定broker需要使用的若干个文件目录路径,多路径以逗号分隔。
log.dir: 补充第一个参数,只能设置单个路径
zookeeper.connect: zookeeper 地址。格式zk1:2181,zk2:2181,zk3:2181 。 如果多kafka集群使用同一个zookeeper集群,则通过chroot来区别。现假如有2个kafka集群,分别为kafka01,kafka02. 格式为zk1:2181,zk2:2181,zk32181/kafka01和zk1:2181,zk2:2181,zk32181/kafka02
- listeners: 监听器
advertised.listeners:
auto.create.topics.enable: 是否允许自动创建topic。生产环境最好设置成不允许自动创建topic。否则会有很多稀奇古怪的topic被自动创建。
unclean.leader.election.enable: 是否允许unclean leader 选举。建议设置成false。如果设置成false,导致的后果是分区不可用。反之,如果设置成true,可能造成的结果就是丢数据。
auto.leader.rebalance.enable: 是否允许定期选举。
log.retention.{hour|minutes|ms}: 都是控制一条消息被保存多长时间。从优先级上来说ms设置最高,minutes次之,hour最低。一般情况下设置为hour级别多一些。比如
log.retention.hour=168
表示保存7天数据,自动删除7天前的数据。log.retention.bytes: 表示指定broker为消息保存的总磁盘大小。默认为-1,表示不控制大小
message.max.bytes: 控制broker能够接受的最大消息大小。默认为1000012,不到1MB,建议调大该值。
配置多路径的优势
提升读写性能:比起单块硬盘,多块硬盘同时读写数据有更高的吞吐量。
能够实现故障转移:即Failover。这是kafka1.1版本引入的强大功能。之前版本,kafka broker使用的任何一块硬盘挂掉,整个broker进程都会关闭。1.1之后可以将坏掉的磁盘上的数据自动转移到其他正常磁盘。而且broker能自动
1.5.2 topic级别参数
- retention.ms: 规定该消息保存时长,默认7天。优先级大于全局设定值。
- retention.bytes: 规定该topic预留多大磁盘空间,默认-1.表示可以无限使用磁盘空间。
- max.message.bytes: 正常接收的topic最大消息大小。
下面为创建一个topic名为test,设置单分区,单副本,保留半年,消息最大为5M。
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
1.5.3 JVM端配置
- KAFKA_HEAP_OPTS: 指定堆大小
- KAFKA_JVM_PERFORMANCE_OPTS: 指定GC参数
$ export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$ export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$ bin/kafka-server-start.sh config/server.properties
1.5.4 操作系统参数
unlimit -n 100000 #增大文件描述符限制
文件系统选择 xfs