本文主要讨论kafka 分区的初始分配、重分配(本文不讨论consumer情况)

首先,从kafka的底层实现来说,主题与分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对于一个日志文件(物理层面)。

要研究kafka分区的初始分配、重分配需知晓以下概念。

1、AR(Assigned Replicas):分区中的所有副本

2、ISR(In-Sync Replicas)与leader副本保持一定程度同步的副本(包括leader副本)

3、OSR(Out-of-Sync Replicas)与leade副本同步滞后过多的副本。

由此可见AR=ISR+OSR,一般情况下AR=ISR,OSR为空,kafka集群的一个broker中最多只能有一个它的副本,我们可以将leader副本所在的broker节点叫做分区的leader节点,而follower副本所在的broker节点叫做分区的follower节点。

接下来开始本文的重点讨论

一、kafka 分区的初始分配

这里的分区分配☞为集群创建指定主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本。

在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有则按照kakfa内部实现计算分配。内部分配分2种策略:1、未指定机架 2、指定机架 ,如果集群中的所有broker节点都没有配置broker.rack参数,则采用disable-rack-aware参数来创建主题,否则采用指定机架创建主题。

1、未指定机架策略:均匀分配所有副本到所有的broker上

private def assignReplicasToBrokersRackUnaware(
nPartitions: Int, //分区数
replicationFactor: Int, //副本因子
brokerList: Seq[Int], //集群中的broker列表
fixedStartIndex: Int , //起始索引,即第一个副本分配的位置,默认值为-1
startPatitionId: Int  //起始分区编号,默认值为-1
):
Map[Int,Seq[Int]] = {
	val ret = mutable.Map[Int,Seq[Int]]()  // 保存分配结果的集合
	val brokerArray = brokerList.toArray  //brokerId的列表
	//如果起始索引fixedStartIndex小于0,则根据broker列表的长度随机生成一个,以此来保证是有效的brokerId
	val startIndex = if(fixedStartIndex>0) fixedStartIndex
	   else rand.nextInt(brokerArray.length)
	//确保起始分区号不小于0
	var currentPartitionId = math.max(0,startPatitionId)
	//指定了副本的间隔,目的是为了更均匀的将副本分配到不同的broker上
	var nextReplicaShift = if(fixedStartIndex>=0) fixedStartIndex
	   else rand.nextInt(brokerArray.length)
	//轮询所有分区,将每个分区的副本分配到不同的broker上
	for(_<- 0 until nPartitions){
		if(currentPartitionId>0 && currentPartitionId%brokerArray.length == 0)
			nextReplicaShift+=1
		//从broker-list中选定一个随机的位置,从这个位置开始,将每一个partition的第一个replica依次赋予brokerList中的broker.
		val firstReplicaIndex = (currentPartitionId+startIndex)%brokerArray.length
		val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
		//保存该分区所有副本分配的broker集合
		for(j<- 0 until replicationFactor - 1)
			replicaBuffer+=brokerArray(replicaIndex(firstReplicaIndex,nextReplicaShift,j,brokerArray.length))//为其余的副本分配broker
		//保存该分区的所有副本的分配信息
		ret.put(currentPartitionId,replicaBuffer)
		//继续为下一个分区分配副本
		currentPartitionId+=1
	}
	ret
}
//1,0,1,3  当分配好了第一个replica之后,剩下的replica以第一个replica所在的broker为基准,依次赋予之后的broker
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
    val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
    (firstReplicaIndex + shift) % nBrokers
}

2、指定机架分配分区

如机架与broker节点的对照关系如下:

rack1:0,1,2

rack2:3,4,5

rack3:6,7,8

未指定机架的brokeArray为[0,1,2,3,4,5,6,7,8],而指定机架的brokerArray为[0,3,6,1,4,7,2,5,8],循环为每个分区分配副本,尽量保证分区在机架中均匀分布,这里不是简单的将这个broker添加到当前分区的副本列表中,还要经过一层筛选,满足以下任意一个条件的broker不能被添加到当前分区的副本列表中:1、如果此broker所在的机架中已经存在一个broker拥有该分区的副本,并且还有其他机架中没有任何一个broker拥有该分区的副本 2、如果此broker中已经拥有了该分区的副本,并且还有其他broker中没有该分区的副本

二、kafka分区的重分配

1、优先副本的选取

分区使用多副本机制来提升可靠性,leader副本负责读写,follower只负责在内部进行消息的同步,一旦leader节点挂掉,kafka需要从follower副本中挑选一个作为leader副本。如:原先3个broker,3个分区,当其中一个broker挂掉之后,剩下的2个broker承担的分区数变成2、1,如此一来,均衡的负载变得不均衡,为了有效的治理负载均衡问题,kakfa引入了优先副本的概念,所谓的优先副本是指AR集合列表的第一个副本,理想情况下,优先副本就是该分区的leader副本。kafka要确保所有的主题的优先副本在kafka中均匀分布,这样就保证了整个kafka集群的负载均衡。

于此:有个参数需要了解:

auto.leader.rebalance.enable:此参数的默认值为true,如果一个分区有3个副本,且3个副本的优先级为0,1,2,根据副本优先概念,0会作为leader,当0挂掉后,会启动1作为leader,当0恢复后,会重新启用0作为leader,这就是分区自动平衡,保证负载均衡,执行周期由参数leader.imbalance.check.interval.seconds控制,默认5分钟,分区自动平衡会造成客户端的堵塞,生产环境不建议开启,可手动执行分区平衡。

2、分区重分配

当一个节点的分区副本不可用时,kafka不会自动将这些失效的分区副本迁移到集群剩余的可用的broker节点上,会影响负载均衡以及服务的可用性、可靠性,当集群中新增一个节点时,只有新创建的主题分区才有可能分配到这个节点上,之前的主题分区不会自动分配到这个新的节点上,也会影响其负载均衡

为了解决上诉问题,引入分区重分配方案,kafka提供了kafka-reassign-partitions.sh脚本来执行分区重分配的工作(集群扩容、节点失效场具下工作)                主要分为以下3个步骤:首先创建一个包含主题清单的JSON文件,其次根据主题清单和broker节点清单生成一份重分配方案,最后根据这份方案执行具体的重分配动作。

具体操作,后续本地运行一遍后补上

参考:《深入理解kafka核心设计与实践原理》

03-27 21:03