本文主要讨论kafka 分区的初始分配、重分配(本文不讨论consumer情况)
首先,从kafka的底层实现来说,主题与分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对于一个日志文件(物理层面)。
要研究kafka分区的初始分配、重分配需知晓以下概念。
1、AR(Assigned Replicas):分区中的所有副本
2、ISR(In-Sync Replicas)与leader副本保持一定程度同步的副本(包括leader副本)
3、OSR(Out-of-Sync Replicas)与leade副本同步滞后过多的副本。
由此可见AR=ISR+OSR,一般情况下AR=ISR,OSR为空,kafka集群的一个broker中最多只能有一个它的副本,我们可以将leader副本所在的broker节点叫做分区的leader节点,而follower副本所在的broker节点叫做分区的follower节点。
接下来开始本文的重点讨论
一、kafka 分区的初始分配
这里的分区分配☞为集群创建指定主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本。
在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有则按照kakfa内部实现计算分配。内部分配分2种策略:1、未指定机架 2、指定机架 ,如果集群中的所有broker节点都没有配置broker.rack参数,则采用disable-rack-aware参数来创建主题,否则采用指定机架创建主题。
1、未指定机架策略:均匀分配所有副本到所有的broker上
private def assignReplicasToBrokersRackUnaware(
nPartitions: Int, //分区数
replicationFactor: Int, //副本因子
brokerList: Seq[Int], //集群中的broker列表
fixedStartIndex: Int , //起始索引,即第一个副本分配的位置,默认值为-1
startPatitionId: Int //起始分区编号,默认值为-1
):
Map[Int,Seq[Int]] = {
val ret = mutable.Map[Int,Seq[Int]]() // 保存分配结果的集合
val brokerArray = brokerList.toArray //brokerId的列表
//如果起始索引fixedStartIndex小于0,则根据broker列表的长度随机生成一个,以此来保证是有效的brokerId
val startIndex = if(fixedStartIndex>0) fixedStartIndex
else rand.nextInt(brokerArray.length)
//确保起始分区号不小于0
var currentPartitionId = math.max(0,startPatitionId)
//指定了副本的间隔,目的是为了更均匀的将副本分配到不同的broker上
var nextReplicaShift = if(fixedStartIndex>=0) fixedStartIndex
else rand.nextInt(brokerArray.length)
//轮询所有分区,将每个分区的副本分配到不同的broker上
for(_<- 0 until nPartitions){
if(currentPartitionId>0 && currentPartitionId%brokerArray.length == 0)
nextReplicaShift+=1
//从broker-list中选定一个随机的位置,从这个位置开始,将每一个partition的第一个replica依次赋予brokerList中的broker.
val firstReplicaIndex = (currentPartitionId+startIndex)%brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
//保存该分区所有副本分配的broker集合
for(j<- 0 until replicationFactor - 1)
replicaBuffer+=brokerArray(replicaIndex(firstReplicaIndex,nextReplicaShift,j,brokerArray.length))//为其余的副本分配broker
//保存该分区的所有副本的分配信息
ret.put(currentPartitionId,replicaBuffer)
//继续为下一个分区分配副本
currentPartitionId+=1
}
ret
}
//1,0,1,3 当分配好了第一个replica之后,剩下的replica以第一个replica所在的broker为基准,依次赋予之后的broker
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
2、指定机架分配分区
如机架与broker节点的对照关系如下:
rack1:0,1,2
rack2:3,4,5
rack3:6,7,8
未指定机架的brokeArray为[0,1,2,3,4,5,6,7,8],而指定机架的brokerArray为[0,3,6,1,4,7,2,5,8],循环为每个分区分配副本,尽量保证分区在机架中均匀分布,这里不是简单的将这个broker添加到当前分区的副本列表中,还要经过一层筛选,满足以下任意一个条件的broker不能被添加到当前分区的副本列表中:1、如果此broker所在的机架中已经存在一个broker拥有该分区的副本,并且还有其他机架中没有任何一个broker拥有该分区的副本 2、如果此broker中已经拥有了该分区的副本,并且还有其他broker中没有该分区的副本
二、kafka分区的重分配
1、优先副本的选取
分区使用多副本机制来提升可靠性,leader副本负责读写,follower只负责在内部进行消息的同步,一旦leader节点挂掉,kafka需要从follower副本中挑选一个作为leader副本。如:原先3个broker,3个分区,当其中一个broker挂掉之后,剩下的2个broker承担的分区数变成2、1,如此一来,均衡的负载变得不均衡,为了有效的治理负载均衡问题,kakfa引入了优先副本的概念,所谓的优先副本是指AR集合列表的第一个副本,理想情况下,优先副本就是该分区的leader副本。kafka要确保所有的主题的优先副本在kafka中均匀分布,这样就保证了整个kafka集群的负载均衡。
于此:有个参数需要了解:
auto.leader.rebalance.enable:此参数的默认值为true,如果一个分区有3个副本,且3个副本的优先级为0,1,2,根据副本优先概念,0会作为leader,当0挂掉后,会启动1作为leader,当0恢复后,会重新启用0作为leader,这就是分区自动平衡,保证负载均衡,执行周期由参数leader.imbalance.check.interval.seconds控制,默认5分钟,分区自动平衡会造成客户端的堵塞,生产环境不建议开启,可手动执行分区平衡。
2、分区重分配
当一个节点的分区副本不可用时,kafka不会自动将这些失效的分区副本迁移到集群剩余的可用的broker节点上,会影响负载均衡以及服务的可用性、可靠性,当集群中新增一个节点时,只有新创建的主题分区才有可能分配到这个节点上,之前的主题分区不会自动分配到这个新的节点上,也会影响其负载均衡
为了解决上诉问题,引入分区重分配方案,kafka提供了kafka-reassign-partitions.sh脚本来执行分区重分配的工作(集群扩容、节点失效场具下工作) 主要分为以下3个步骤:首先创建一个包含主题清单的JSON文件,其次根据主题清单和broker节点清单生成一份重分配方案,最后根据这份方案执行具体的重分配动作。
具体操作,后续本地运行一遍后补上
参考:《深入理解kafka核心设计与实践原理》