前言


在HDFS中,每时每刻都在进行着大量block块的创建和删除操作,这些庞大的block块构建起了这套复杂的分布式系统.普通block的读写删除操作一般人都或多或少了解过一些,可是过量的副本清理机制是否有人知道呢,就是overReplicatedBlock的处理,针对过量的副本块,HDFS怎么处理,何时处理,处理的策略机制怎样,本文就给大家分享HDFS在这方面的知识.

过量副本块以及发生的场景


过量副本块的意思通俗解释就是集群中有A副本3个,满足标准的3副本策略,可是此时发生了某种场景后,A副本块突然变为5个了,为了达到副本块的标准系数3个,系统就会进行多余2块副本的清除动作,而这个清除动作就是本文所要重点描写叙述的.过量副本块的现象是比較好解释的,那么问题来了,究竟有哪些潜在的原因或条件会触发多余副本块的发生呢(在此指的是HDFS中)?

本人通过对HDFS源代码的阅读,总结出一下3点

  • ReCommission节点又一次上线.这类操作是运维操作引起的.节点下线操作会导致大量此节点的block块在集群中大量拷贝,一旦此节点取消下线,之前已拷贝的大量块必定会成为多余的副本块.

  • 人为又一次设置block replication副本数.还是以A副本举例,A副本当前满足标准副本数3个,此时用户张三通过使用hdfs的API方法setReplication人为设置副本数为1.此时也会早A副本数多余2个的情况,即使说HDFS中的副本标准系数还是3个.

  • 新增加的block块记录在系统中被丢失.这种可能相对于前2种case的情况,是内部因素造成.这些新增加的丢失的block块记录会在BlockManager进行再次扫描检測,防止出现过量副本的现象.

OK,以上3种情形就是可能发生过量副本块的原因.至于这3种情况是怎样一步步的终于调用到处理多余副本块的过程在后面的描写叙述中会再给出,先来看下多余副本块是怎样被选出并处理掉的.

OverReplication多余副本块处理


多余副本块的处理分为2个子过程:

  • 多余副本块的选出
  • 选出的多余副本块的处理

我们从源代码中进行一一寻找原因,首先副本块的选出.

多余副本块的选择


进入blockManager的processOverReplicatedBlock方法,非常显然,方法名已经表明了方法操作的本意了.

/**
* Find how many of the containing nodes are "extra", if any.
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
private void processOverReplicatedBlock(final Block block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {

此方法的凝视的意思是找出存在”多余”的节点,假设他们是多余的,调用chooseExcessReplicates并标记他们,增加增加到excessReplicateMap中.以下进行细节的处理

// 节点列表变量的声明
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
// 从corruptReplicas变量中获取是否存在坏的block所在的节点
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);

继续后面的处理

    // 遍历此过量副本块所在的节点列表
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
...
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
.getDatanodeUuid());
// 假设在当前过量副本图对象excessReplicateMap中不存在
if (excessBlocks == null || !excessBlocks.contains(block)) {
//而且所在节点不是已下线或下线中的节点
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// 而且这个副本块不是损坏的副本块
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
// 将此过滤副本块的一个所在节点增加候选节点列表中
nonExcess.add(storage);
}
}
}
}

所以从这里看出nonExcess对象事实上是一个候选节点的概念,将block副本块所在的节点列表进行多种条件的再推断和剔除.最后就调用到选择终于过量副本块节点的方法

chooseExcessReplicates(nonExcess, block, replication,
addedNode, delNodeHint, blockplacement);

进入chooseExcessReplicates方法

    // first form a rack to datanodes map and
// 首先会形成机架对datanode节点的映射关系图
BlockCollection bc = getBlockCollection(b);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); // 初始化机架->节点列表映射图对象
final Map<String, List<DatanodeStorageInfo>> rackMap
= new HashMap<String, List<DatanodeStorageInfo>>();
// 超过1个副本数的节点列表
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
// 恰好1个副本数的节点列表
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();

为什么要划分不同节点列表的选择呢.由于在这里设计者做了优先选择,在相同拥有多余副本块的节点列表中,优先选择节点中副本数多于1个的,其次再是副本数恰好有1个的节点.这个设计非常好理解,由于你上面的多余副本数很多其它嘛,我当然要先从多的開始删.

    // 节点划分成相应2个集合
// split nodes into two sets
// moreThanOne contains nodes on rack with more than one replica
// exactlyOne contains the remaining nodes
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);

进入划分方法

  public void splitNodesWithRack(
final Iterable<DatanodeStorageInfo> storages,
final Map<String, List<DatanodeStorageInfo>> rackMap,
final List<DatanodeStorageInfo> moreThanOne,
final List<DatanodeStorageInfo> exactlyOne) {
// 遍历候选节点列表,形成机架->节点列表的相应关系
for(DatanodeStorageInfo s: storages) {
final String rackName = getRack(s.getDatanodeDescriptor());
List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
if (storageList == null) {
storageList = new ArrayList<DatanodeStorageInfo>();
rackMap.put(rackName, storageList);
}
storageList.add(s);
}

以下给出的划分算法

    // split nodes into two sets
for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
if (storageList.size() == 1) {
// exactlyOne contains nodes on rack with only one replica
// 假设机架中相应的节点数量仅仅有1个,则节点上副本数就为1,否则就为多个
exactlyOne.add(storageList.get(0));
} else {
// moreThanOne contains nodes on rack with more than one replica
moreThanOne.addAll(storageList);
}
}

上面划分的原理应该是与相应的block副本存放策略原理相关,这个我到没有细致去了解原因,读者能够自行阅读相关BlockPlacementPolicy相关代码进行了解.于是在这段代码过后,节点组就被分为了2大类,exactlyOne和moreThanOne.至此chooseExcessReplicates的上半段代码运行完毕,接下来看下半段代码的运行过程

    // 选择一个待删除的节点会偏向delNodeHintStorage的节点
// pick one node to delete that favors the delete hint
// 否在会从节点列表中选出一个可用空间最小
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
boolean firstOne = true;
final DatanodeStorageInfo delNodeHintStorage
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
final DatanodeStorageInfo addedNodeStorage
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);

上面这3行凝视传达出2个意思

  • 能够直接传入要删除的节点,假设能够,则优选选择传入的delHint节点
  • 在每一个节点的内部列表中,优选会选择出可用空间最少的,这个也好理解,相同的副本数的节点列表中,当然要选择可用空间尽可能少的,以便释放出多的空间.
    // 假设眼下过量副本所在节点数大于标准副本数,则进行循环移除
while (nonExcess.size() - replication > 0) {
final DatanodeStorageInfo cur;
// 推断能否够使用delNodeHintStorage节点进行取代
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
moreThanOne, excessTypes)) {
cur = delNodeHintStorage;
} else { // regular excessive replica removal
// 否则进行常规的节点选择
cur = replicator.chooseReplicaToDelete(bc, b, replication,
moreThanOne, exactlyOne, excessTypes);
}

推断能否够使用delNodeHintStorage节点的推断逻辑这里就忽略了,主要看一下关键的chooseReplicaToDelete方法,这个分支处理才是最经经常使用到的处理方式.

    // 选择的节点要么是心跳时间最老的或者是可用空间最少的
// Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval
for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
if (!excessTypes.contains(storage.getStorageType())) {
continue;
}

first和second的节点选择逻辑例如以下,非常的简单

  /**
* Pick up replica node set for deleting replica as over-replicated.
* First set contains replica nodes on rack with more than one
* replica while second set contains remaining replica nodes.
* So pick up first set if not empty. If first is empty, then pick second.
*/
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
Collection<DatanodeStorageInfo> first,
Collection<DatanodeStorageInfo> second) {
return first.isEmpty() ? second : first;
}

在节点列表的每次迭代循环中会进行以下2个指标的对照

      // 进行心跳时间的对照
if (lastHeartbeat < oldestHeartbeat) {
oldestHeartbeat = lastHeartbeat;
oldestHeartbeatStorage = storage;
}
// 进行可用空间的对照
if (minSpace > free) {
minSpace = free;
minSpaceStorage = storage;
}

然后进行选择,优先选择心跳时间最老的

    final DatanodeStorageInfo storage;
if (oldestHeartbeatStorage != null) {
storage = oldestHeartbeatStorage;
} else if (minSpaceStorage != null) {
storage = minSpaceStorage;
} else {
return null;
}

然后进行以下2个操作

      // 又一次进行rackMap对象关系的调整
// adjust rackmap, moreThanOne, and exactlyOne
replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
exactlyOne, cur);
// 将选出的节点从候选节点列表中移除
nonExcess.remove(cur);

能够说,到了这里,多余副本块所在节点就被选出了.

多余副本块的处理


多余副本块的处理就显得非常easy了,反正目标对象以及所在节点已经找到了,增加到相应的对象中就可以.

      // 增加到excessReplicateMap对象中
addToExcessReplicate(cur.getDatanodeDescriptor(), b); //
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
// 将此节点上的b block增加到无效节点中
addToInvalidates(b, cur.getDatanodeDescriptor());

增加到invalidates无效block列表后不久,此block就将被清除.

多余副本块清除的场景调用


又一次回到之前提到过的多余副本块的3大场景调用.有人可能会好奇我是怎么找到这3种使用场景的,通过查看chooseExcessReplicates哪里被调用就能够了,例如以下图所看到的

HDFS怎样检測并删除多余副本块-LMLPHP

针对上述的5种调用情况,于是我总结了3种使用场景.以下一一进行对照.

场景1: ReCommission又一次上线过程


在方法processOverReplicatedBlocksOnReCommission中调用了清除过量副本块的方法

  /**
* Stop decommissioning the specified datanode.
* @param node
*/
@VisibleForTesting
public void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node);
// Over-replicated blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive()) {
blockManager.processOverReplicatedBlocksOnReCommission(node);
}
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
decomNodeBlocks.remove(node);
} else {
LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
node, node.getAdminState());
}
}

下线操作又一次恢复,必定要停止正在下线的动作,所以会在这种方法中进行调用.

场景2: SetReplication人为设置副本数


人为设置副本数是一个主动因素,调用的直接方法例如以下:

  /** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block... blocks) {
if (newRepl == oldRepl) {
return;
} // update needReplication priority queues
for(Block b : blocks) {
updateNeededReplications(b, 0, newRepl-oldRepl);
}
// 当设置的新的副本数值比原有的小的时候,须要进行过量副本的清除操作
if (oldRepl > newRepl) {
// old replication > the new one; need to remove copies
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
for(Block b : blocks) {
processOverReplicatedBlock(b, newRepl, null, null);
}
} else { // replication factor is increased
LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
}
}

这个API方法是能够被外面的Client端程序调用触发的.

场景3: 丢失新增加的block记录信息


丢失新增加的block信息导致集群中存在多余的副本.官方的解释是这种:

  /*
* Since the BlocksMapGset does not throw the ConcurrentModificationException
* and supports further iteration after modification to list, there is a
* chance of missing the newly added block while iterating. Since every
* addition to blocksMap will check for mis-replication, missing mis-replication
* check for new blocks will not be a problem.
*/

由于存在丢失block信息的可能性,所以会开单独的线程去又一次检測是否存在过量副本的现象.

  private void processMisReplicatesAsync() throws InterruptedException {
...
while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
int processed = 0;
namesystem.writeLockInterruptibly();
try {
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
BlockInfoContiguous block = blocksItr.next();
MisReplicationResult res = processMisReplicatedBlock(block);
...

场景4: 其它场景的检測


其它场景有的时候也会调用到processOverReplicatedBlock的方法,但不是外界的因素导致,而是出于一种慎重性的考虑,比方在addStoredBlock,当新增加的块被增加到blockMap中时,会再次进行块的检測.还有1种是在文件终于写入完毕的时候,也会调用一次checkReplication此方法,来确认集群中没有多余的相同块的情况.这2种情况的调用如上图所看到的,这里就不放出详细的代码了,可见,HDFS的设计者在细节方面的处理真的是非常用心啊.

05-12 07:19