1. 背景
shard allocation
意思是分片分配
, 是一个将分片分配到节点的过程; 可能发生该操作的过程包括:
- 初始恢复(
initial recovery
) - 副本分配(
replica allocation
) - 重新平衡(
rebalance
) - 节点的新增和删除
分片的分配操作, 是由 master
角色的节点来决定什么时候移动分片, 以及移动到哪个节点上, 以达到集群的均衡;
2. 机制分析
2.1. Allocation
触发条件
- 新增或删除
index
索引 node
节点的新增或删除- 执行
reroute
命令 - 修改
replica
副本数量 - 集群重启
具体对应源码解释:来源
2.2. Rebalance
的触发条件
在 rebalance
之前会经过 2.3.2
中介绍的所有策略里实现的 canRebalance
方法, 全部通过后才会执行下面的 Rebalance
过程;
Rebalance
过程是通过调用 balanceByWeights()
方法, 计算 shard 所在的每个 node 的 weight
值,
其中:
numAdditionalShards
一般为 0, 调用weightShardAdded
,weightShardRemoved
方法时分别取值为1
和-1
;- theta0 =
cluster.routing.allocation.balance.shard
系统动态配置项, 默认值为0.45f
; - theta1 =
cluster.routing.allocation.balance.index
系统动态配置项, 默认值为0.55f
;
源码如下:
private static class WeightFunction {
private final float indexBalance;
private final float shardBalance;
private final float theta0;
private final float theta1;
WeightFunction(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
}
float weight(Balancer balancer, ModelNode node, String index) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
}
}
2.3. 源码分析
分片分配就是把一个分片分配到集群中某个节点的过程, 其中分配决策包含了两个方面:
- 哪些分片应该分配到哪些节点上
- 哪个分片作为主分片, 哪个作为副本分片
Elasticsearch 主要通过两个基础组件来完成分片分配这个过程的: allocator
和 deciders
;
allocator
寻找最优的节点来分配分片;deciders
负责判断并决定是否要进行分配;
- 新建的索引
allocator
负责找出拥有分片数量最少的节点列表, 按分片数量递增排序, 分片数量较少的会被优先选择; 对于新建索引, allocator
的目标是以更为均衡的方式把新索引的分片分配到集群的节点中;
deciders
依次遍历 allocator
给出的节点列表, 判断是否要把分片分配给该节点, 比如是否满足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等;
- 已有的索引
allocator
对于主分片, 只允许把主分片指定在已经拥有该分片完整数据的节点上; 对于副本分片, 则是先判断其他节点上是否已有该分片的数据的拷贝, 如果有这样的节点, allocator
则优先把分片分配到这其中一个节点上;
2.3.1. Allocator
PrimaryShardAllocator
找到拥有某Shard
最新数据(主分片)的节点;ReplicaShardAllocator
找到磁盘上拥有这个Shard
数据(副本分片)的节点;BalancedShardsAllocator
找到拥有最少Shard
个数的节点;
public class BalancedShardsAllocator implements ShardsAllocator {
public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, Property.Dynamic, Property.NodeScope);
private volatile WeightFunction weightFunction;
private volatile float threshold;
}
2.3.2. Deciders
Deciders
决策期基础组件的抽象类为 AllocationDecider
:
public abstract class AllocationDecider {
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canRebalance(RoutingAllocation allocation) {
return Decision.ALWAYS;
}
}
ES 7.4.0 中的 Decider
决策器包括以下所示, 他们均实现上面的 AllocationDecider
抽象类, 并重写 canRebalance
, canAllocate
, canRemain
, canForceAllocatePrimary
等方法;
决策器比较多, 大致分类如下, 并列举决策器对应的配置项:
2.3.2.1. 负载均衡类
SameShardAllocationDecider
: 避免主副分片分配到同一个节点;AwarenessAllocationDecider
: 感知分配器, 感知服务器, 机架等, 尽量分散存储 Shard;ShardsLimitAllocationDecider
: 同一个节点上允许存在同一个index
的shard
数目;
2.3.2.2. 并发控制类
ThrottlingAllocationDecider
:recovery
阶段的限速配置, 避免过多的recovering allocation
导致该节点的负载过高;ConcurrentRebalanceAllocationDecider
:rebalace
并发控制, 表示集群同时允许进行rebalance
操作的并发数量;DiskThresholdDecider
: 根据节点的磁盘剩余量来决定是否分配到该节点上;
2.3.2.3. 条件限制类
RebalanceOnlyWhenActiveAllocationDecider
: 所有Shard
都处于active
状态下才可以执行rebalance
操作;FilterAllocationDecider
: 通过接口动态设置的过滤器;cluster
级别会覆盖index
级别;ReplicaAfterPrimaryActiveAllocationDecider
: 保证只在主分片分配完成后(active
状态)才开始分配副本分片;ClusterRebalanceAllocationDecider
: 通过集群中active
的shard
状态来决定是否可以执行rebalance
;MaxRetryAllocationDecider
: 防止 shard 在失败次数达到上限后继续分配;
2.3.2.4. 其他决策类
EnableAllocationDecider
: 设置允许分配的分片类型;index
级别配置会覆盖cluster
级别配置;NodeVersionAllocationDecider
: 检查分片所在 Node 的版本是否高于目标 Node 的 ES 版本;SnapshotInProgressAllocationDecider
: 决定snapshot
期间是否允许allocation
, 因为snapshot
只会发生在主分片上, 所以该配置只会限制主分片的allocation
;
接下来介绍一下在 Elasticsearch 中涉及到 Allocation
和 Rebalance
的相关配置项;
3. cluster-level 配置
3.1. Shard allocation 配置
控制分片的分配和恢复;
3.2. Shard rebalancing 配置
控制集群之间的分片平衡;
3.3. 分片平衡启发式
以下配置用于决定每个分片的存放位置; 当rebalancing
操作不再使任何节点的权重超过balance.threshold
时, 集群即达到平衡;
4. Index-level 配置
以下配置控制每个索引中的分片分配;
4.1. index-level 分片分配过滤(来源)
配置需要分两步:
- 在每个 Elasticsearch 节点的
elasticsearch.yml
配置文件中添加自定义节点属性, 比如以small
,medium
,big
区分节点类型, 则配置文件中可添加:
node.attr.size: medium
或者在启动 Elasticsearch 服务时, 在命令行里添加 ./bin/elasticsearch -Enode.attr.size=medium
;
- 在新建索引的
mapping
时, 添加index.routing.allocation.include/exclude/require.size: medium
的过滤配置即可;
PUT <index_name>/_settings
{
"index.routing.allocation.include.size": "medium"
}
可以配置多个自定义节点属性, 并且必须同时满足索引里配置的多个过滤条件;
- index.routing.allocation.include.{attribute}: {values}
- index.routing.allocation.require.{attribute}: {values}
- index.routing.allocation.exclude.{attribute}: {values}
其中 {attribute}
可以是上面提到的自定义节点属性, ES 自己也有一些内置的节点属性:
其中 {values}
可以是单个值, 也可以是逗号分隔的多个值, 也可以使用通配符 *
进行模糊匹配;
4.2. 设置延迟分配, 当节点离开时(来源)
当某个节点由于突发原因, 比如网络中断, 人为操作重启等, 需要暂时离开集群时, 集群会立刻新建副本分片以替换丢失的副本, 然后在剩余的所有节点之间进行rebalancing
, 这样导致在短时间内该突发节点又恢复过来后, 原先的副本就无法再使用, 集群会将刚才新建的副本分片再拷贝回到该节点上; 这样就会造成不必要的资源浪费, 以及节点分片rebalancing
带来的波动;
可以使用 index.unassigned.node_left.delayed_timeout
动态设置来延迟由于节点离开而导致未分配的副本分片的分配问题; 该配置默认值 1m
;
PUT _all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "5m"
}
}
修改成以上配置后, 如果在 5m
内, 该节点可以恢复重新加入集群, 则集群会自动恢复该节点的副本分片分配, 恢复速度很快;
4.3. 索引恢复的优先级(来源)
索引分片恢复的优先级按照:
- 可选的
index.priority
配置, 值越大优先级越高; index
索引的创建日期, 越新的索引优先级越高;index
索引的名称;
4.4. 每个节点的分片总数(来源)
这些配置是硬性配置, 可能会导致一些分片无法分配, 需要慎重配置;