Kafka是由scala和java编写的一款高吞吐量分布式发布订阅消息系统。
应用场景:
- 异步处理
- 应用解耦
- 流量削峰
- 日志处理
- 消息通讯
相关术语:
- Broker:在集群中的服务器,用于存储消息,提供接口给生产者和消费者
- Topic:消息的一个自定义类别,每个消息都有一个topic,topic下有很多条消息,生产者和消费者通过用定义好的topic名来通讯
- Parittion:每个topic包含一个或多个分区,用于对消息进行排序,如果一个topic有多个分区,则消息的顺序不能保证,如果需要严格保证顺序,则需要将partition设置为1。同一topic的分区数只能增加不能减少。
- Producer:生产者,消息的投递方
- Consumer:消费者,消息的接收方
- Consumer Group:不同消费组的消费者在订阅同一个topic时,会拉到相同的消息,相同的消费族下的消费者在同一个topic的时候,会拉到不同分区的消息
- Leader:每个partition都有多个副本,其中一个会成为Leader,leader负责数据的读写
- Follower:Follower跟随Leader,所有写请求都需要先果果Leader,然后再广播到所有Follower。如果Leader失效,则从Follower中选举一个新的Leader,当Follower与Leader挂掉/卡住或者同步太慢,leader会把follower从ISR中删除
- Zookeeper:负责维护和协调broker,但系统新增broker或者某个broker失效,有zookeeper通知生产者和消费者,
- AR:Assigned Replicas。所有的副本
- ISR:In of sync Replicas。已同步的副本
- OSR:Out of sync Replicas。没有同步的副本
- LEO:LogEndOffset。分区最新的数据的offset。每次写入,offset都会发生变化
- HW:HighWatermark。只有写入数据被同步到所有的ISR中的副本后,数据才认为已提交,HW更新到该位置,在HW之前的数据才可以被消费,保证没有同步完成的数据不会被消费者访问到
数据流图
HW和LEO
特性:
- 高吞吐量,低延迟,kafka每秒可以处理几十万条消息,延迟最低只有几毫秒
- 可扩展:集群支持热扩展
- 持久化,可靠性:消息被持久化到磁盘并支持数据备份
- 若错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发
消息发送流程:
- 指定topic/key/value
- 序列化value
- 分区:通过hash(key)/value/自定义来确定分区
- 拦截:可以通过编写拦截器,统一对消息进行格式转换
消息发送类型:
- 同步
- 异步
生产者的其他参数
- acks:0代表不等任何写入成功则马上返回,如果出现故障,生产者无感知。用于高吞吐量场景;1代表集群的leader收到消息,如果没有leader,则返回失败,并重试;-1代表所有节点都同步完,最安全
- retries:如果分区找不到leader,则会返回失败,并重试retries次,超过次数则放弃重试返回错误。
- batch.size:有很多消息要发送到同一分区时,生产者会把他们放到同一批次里,该参数代表内存可以容纳的消息的多少,相当于缓冲区
- max.request.size:单个消息的最大值,需要跟broker可以接收消息的最大值一致message.max.size。要是大于,broker会拒绝接收数据
消费者
消费者可以订阅多个topic,可以指定订阅哪个分区
位移提交
分区内,每条消息都有一个offset,用于管理消息在分区的位置,当消费者读取消息时,broker并不会更新offset,而是由消费者来commit位移
重复消费:
原因:
- 数据已经被消费,但是offset没提交
场景:
- max.poll.interval.ms:消费者两次poll操作允许的最大时间间隔,默认5分钟,如果超过这个时间,kafka会认为消费者下线,kafka会进行rebalance,导致原来的消费者连接失效,无法提交offset,而新的消费者就会重复消费这条消息
- 不同组的消费者消费同一个topic
- 消费者使用自动提交模式,当还没有提交,组内由新的消费者进来或者移除,发生rebalance,原来消费者失效,offset没有提交,消费被重复消费
- 使用异步提交,并且在callback里写了失败重试,但是没有注意顺序。例如提交5的时候,发送网络故障,由于是异步,程序继续运行,再次提交10的时候,提交成功,此时正好运行到5的重试,并且成功。当发生了rebalance,又会重复消费了数据
- 自己手动设置offset
解决方法:
- 在redis中维持offset的记录(key=topic+'-'+partition,value=offset)。每次新的消费者起来,先取出上次读到的offset,然后用seek到上次的offset的位置,然后紧接着从kafka取记录
数据丢失:
场景:
- ack=0,发送失败,就丢失了
- ack=1,leader crash,follower没来得及同步,丢失
- unclean.leader.election.enable 为 true,允许OSR的副本作为leader,当leader和ISR都crash了,OSR中的副本成为leader,数据会丢失
解决:
- ack=all/-1,retries>1,unclean.leader.election.enable=false
会影响吞吐量 - min.insync.replicas>1
生产者发送重复
原因
生产者发送消息但是没有收到broker的响应,导致生产者重试
解决方法:
- 启用幂等
- ack=0 不重试
生产者的幂等性
- 可用于解决生产者的重复发送的问题
- 原理:kafka会对每个生产者维护一个seq,每收到一条消息,seq会自增。当服务器收到seq小于当前最大的seq时,会拒绝这条消息
自动提交
消费者每次poll调用后,每隔5秒会自动向kafka提交offset
同步提交
消费者自己控制什么时候提交offset到kafka,同步等待方式,失败会重试或者抛出异常
异步提交
消费者异步提交offset到kafka,不会阻塞,(ps:不要在提交失败的回调是重试,会导致offset回退)
分组消费再平衡:
场景:
- consumer group中新增或者删除某个consumer,导致其消费分区需要分配到组内的其他consumer
- consumer订阅的topic发生变化,例如订阅topic采用正则表达式匹配,而新增或删除topic匹配正则,则会发生此topic的分区就需要分配到consumer
- consumer订阅的topic增加分区
平衡策略:
- Round Robin:会按分区和消费者的字典序轮询分配,会导致消费不均匀的情况,因为每个消费者可以特定指定自定拥有的分区,那么用轮询分配,就可能造成这些分区有可能会分配到更多的分区
- Range:会根据分区和消费者的字典序轮询分配,首先计算消费者可以得到的range是多少,然后轮询分配,最后一轮,会把剩下的全部分配给前面几个消费者。会导致分配不均匀
- Sticky:每次分配分区之前,都会对consumer根据所拥有的分区个数排序,个数小的排在前面,所以每次都会先从小到大的去分配。这样做的好处是可以尽量平均的分配分区,而且保证原有的分区不会移动到其他consumer那里去
消费拦截器
可以定义同一的入口代码,对消息进行修改或者屏蔽
Leader选举
如果leader失效,则ISR中的节点会向zookeeper抢占leader的角色,谁先第一个抢到,谁就会成为leader
分区重新分配
场景:
- 集群扩容,需要把原有topic分区进行重新分配,否则新增节点不会负载已存在的topic
集群缩容
- pending
存储结构:
每一个parition(文件夹)会平均分配到大小相同的segment文件中
每个文件仅需要顺序读写
segment文件由index文件和data文件组成
日志清理:
- 定时清理
- 指定达到一定大小进行清理
事务
幂等性可以解决一个分区不重复,但是不能解决多个分区的运作,生产者可以通过事务对多个分区进行写操作,并确保要么全部成功,要么全部失败
控制器
集群中会有一个或者多个broker,其中一个会选举为控制器(kafka controler),它负责整个集群所有分区和副本的状态,当某个分区的leader出现故障,控制器负责该分区leader的选举,当检测到某个分区的ISR发生变化,由控制器通知所有broker更新元数据;当某个topic增加分区时,由控制器负责分区的重新分配
消息一致性
削峰限流例子
大量客户端发送请求,服务器有可能资源不够,导致大量请求失败,并不能在短时间内处理大量的请求,可以用MQ做缓冲,客户端把请求发送到MQ,server根据自己的能力拉取消息,并把response的消息推送到MQ,客户端再拉取消息。
- 优点:可以支持大量的请求,不会出现大量请求失败
- 缺点:使用MQ是用时间换成功率,时延会拉长