balance过程就是从存储使用率超出集群平均使用率的datanode上将超出的block移动到低于集群平均使用率的datanode上,最终满足平衡标准。
    over-utilized------>under-utilized
    over-utilized------>below-average
    above-average--->under-utilized

1. over-utilized :使用率超出阀值的datanode
2. above-average : 使用率超出平均值,但是低于阀值的datanode
3. below-average : 使用率低于平均值,但是高于阀值的datanode
4. under-utilized: 使用率低于阀值的datanode

平衡标准:参与balance的每个datanode的使用率 与 所有参与balance的全局的使用率 的差值 接近 给定的阀值threshold

balance过程分多个迭代完成,每个迭代开始时balancer按照上面的规则将所有参与balance的datanode分类、配对,形成<source,target>集合,一个<source,target>表示将source datanode上的部分block迁移到target datanode上。

每个<source,target>对应一个线程,该线程主要功能:
    1、从namenode上随机拉取一定数量的block(每次最多2G,累计20G),筛选之后保存到src_block列表中。触发拉取的条件之一:src_block列表中尚未被迁移的block数量少于5(固定值,不可配)。
    2、将src_block列表中的block提交到线程池(线程池大小:dfs.balancer.moverThreads,默认1000)进行迁移。

迁移过程使用多线程(线程数量:dfs.balancer.moverThreads)完成,每个线程每次迁移一个block。
迁移过程:
    step1:从src_block列表中选择一个block
    step2:检查block的所有副本所在的datanode中是否有跟target同机架的datanode,如果有则选择其作为proxy,如果没有则从中随机选择一个datanode作为proxy。
    step3:balancer通知target 从proxy上将block复制一份到本机。
    step4:如果复制成功,则target 请求namenode将block从source上删除,并反馈balancer。 
step1选择block时,会检查target,step2选择proxy时,会检查备选datanode,检查的内容包括:
    1、当前正在执行复制的线程数量是否超过最大线程数(dfs.datanode.balance.max.concurrent.moves)
    2、是否在禁用期。每次复制失败都会导致datanode 10秒钟内禁止做为proxy和target。

核心方法、流程:

Balancer.runOneIteration()      //开始一轮
    Dispatcher.init();          //选择参与balance的datanode
    Balancer.init();            //分组
    Balancer.chooseStorageGroups()         //构建<source, target>  优先匹配同机架、然后随意搭配
    Dispatcher.dispatchAndCheckContinue()  //以source为单位分发block移动任务
        Dispatcher.dispatchBlockMoves();   //每个source启动一个线程
            Source.dispatchBlocks();
                Source.getBlockList();     //从namenode拉取总大小为2G的block
                Source.chooseNextMove()    //每次选一个block放到线程池去移动
                    PendingMove.chooseBlockAndProxy(); //选择一个block和一个proxy
                        PendingMove.markMovedIfGoodBlock();
                            PendingMove.isGoodBlockCandidate(); //是否是合适的block
                                PendingMove.chooseProxySource() //为选择的block选择一个合适的proxy

datanode分类:

private long init(List<DatanodeStorageReport> reports) {
    // compute average utilization
    for (DatanodeStorageReport r : reports) {
      policy.accumulateSpaces(r);
    }
    policy.initAvgUtilization();

    // create network topology and classify utilization collections:
    //   over-utilized, above-average, below-average and under-utilized.
    long overLoadedBytes = 0L, underLoadedBytes = 0L;
    for(DatanodeStorageReport r : reports) {
      final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
      for(StorageType t : StorageType.getMovableTypes()) {
      //所有参与balance的节点[lived 且包含在include中的节点]的使用率(已使用的存储/总的存储)
        final Double utilization = policy.getUtilization(r, t);
        if (utilization == null) { // datanode does not have such storage type
          continue;
        }

        final long capacity = getCapacity(r, t);
        final double utilizationDiff = utilization - policy.getAvgUtilization(t);
        //thresholdDiff越小说明该datanode越理想
        final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
        final long maxSize2Move = computeMaxSize2Move(capacity,
            getRemaining(r, t), utilizationDiff, threshold);

        final StorageGroup g;
        if (utilizationDiff > 0) {
          final Source s = dn.addSource(t, maxSize2Move, dispatcher);
          if (thresholdDiff <= 0) { // within threshold
            aboveAvgUtilized.add(s);
          } else {
            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
            overUtilized.add(s);
          }
          g = s;
        } else {
          g = dn.addTarget(t, maxSize2Move);
          if (thresholdDiff <= 0) { // within threshold
            belowAvgUtilized.add(g);
          } else {
            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
            underUtilized.add(g);
          }
        }
        dispatcher.getStorageGroupMap().put(g);
      }
    }

    logUtilizationCollections();

    Preconditions.checkState(dispatcher.getStorageGroupMap().size()
        == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
           + belowAvgUtilized.size(),
        "Mismatched number of storage groups");

    // return number of bytes to be moved in order to make the cluster balanced
    return Math.max(overLoadedBytes, underLoadedBytes);
  }

选择proxy:

private boolean chooseProxySource() {
      final DatanodeInfo targetDN = target.getDatanodeInfo();
      // if source and target are same nodes then no need of proxy
      if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
        return true;
      }
      // if node group is supported, first try add nodes in the same node group
      if (cluster.isNodeGroupAware()) {
        for (StorageGroup loc : block.getLocations()) {
          if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
              && addTo(loc)) {
            return true;
          }
        }
      }
      // check if there is replica which is on the same rack with the target
      //优选选择副本所在机器跟target在相同rack的dn作为proxy,同时需要改dn当前处理balance[发送block]的线程数量是否超标
      for (StorageGroup loc : block.getLocations()) {
        if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
          return true;
        }
      }
      // find out a non-busy replica
      //如果以上都没有选择出合适的proxy,那么就选一个不忙的dn作为proxy
      for (StorageGroup loc : block.getLocations()) {
        if (addTo(loc)) {
          return true;
        }
      }
      return false;
    }

block 迁移:

step1:balancer socket连接target,发起replaceBlock 请求,请求target从proxy上复制一个block副本到本地来替换掉source上的副本。

step2:target向proxy 发起copyBlock请求,从proxy上将block副本复制到本地,复制完成后 target 通过notifyNamenodeReceivedBlock 方法生成一个ReceivedDeletedBlockInfo对象并缓存在队列,下一次发起心跳的时候会据此对象通知namenode 将target上新加的block副本存入blockmap,并将source上对应的block 副本删除

private void dispatch() {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Start moving " + this);
      }

      Socket sock = new Socket();
      DataOutputStream out = null;
      DataInputStream in = null;
      try {
        //balaner建立跟target的连接
        sock.connect(
            NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
            HdfsServerConstants.READ_TIMEOUT);

        sock.setKeepAlive(true);

        OutputStream unbufOut = sock.getOutputStream();
        InputStream unbufIn = sock.getInputStream();
        ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
            block.getBlock());
        final KeyManager km = nnc.getKeyManager();
        Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
            unbufIn, km, accessToken, target.getDatanodeInfo());
        unbufOut = saslStreams.out;
        unbufIn = saslStreams.in;
        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
            HdfsConstants.IO_FILE_BUFFER_SIZE));
        in = new DataInputStream(new BufferedInputStream(unbufIn,
            HdfsConstants.IO_FILE_BUFFER_SIZE));
        //向target发请求,命令其复制block副本
        sendRequest(out, eb, accessToken);
        receiveResponse(in);
        nnc.getBytesMoved().addAndGet(block.getNumBytes());
        LOG.info("Successfully moved " + this);
      } catch (IOException e) {
        LOG.warn("Failed to move " + this + ": " + e.getMessage());
        target.getDDatanode().setHasFailure();
        // Proxy or target may have some issues, delay before using these nodes
        // further in order to avoid a potential storm of "threads quota
        // exceeded" warnings when the dispatcher gets out of sync with work
        // going on in datanodes.
        //迁移失败,可能是因为proxy、target当前过于繁忙(同时处理blockReplace的操作太多),所以延迟其参与balance
        proxySource.activateDelay(delayAfterErrors);
        target.getDDatanode().activateDelay(delayAfterErrors);
      } finally {
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
        IOUtils.closeSocket(sock);
        //不管迁移成功还是失败,都将当前block从队列中删除
        proxySource.removePendingBlock(this);
        target.getDDatanode().removePendingBlock(this);

        synchronized (this) {
          reset();
        }
        synchronized (Dispatcher.this) {
          Dispatcher.this.notifyAll();
        }
      }
    }


 private void sendRequest(DataOutputStream out, ExtendedBlock eb,
        Token<BlockTokenIdentifier> accessToken) throws IOException {
      new Sender(out).replaceBlock(eb, target.storageType, accessToken,
          source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); //第4个参数是source 的uuid(在name上唯一标示一个datanode),用于通知namenode删source上的副本
    }

target复制完成后的日志:

2019-07-03 10:15:41,319 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Moved BP-691805646-10.116.100.3-1498296722300:blk_2306375028_1234536481 from /10.116.100.149:36907, delHint=7a29645c-cc12-44ce-b9c3-6642a82317c4

常见异常:

target异常:
WARN balancer.Dispatcher: Failed to move blk_10818540065_9759022625 with size=12178 from 10.116.100.126:50010:DISK to 10.116.100.143:50010:DISK through 10.116.102.93:50010: Got error, status message Not able to receive block 10818540065 from /10.116.100.149:57891 because threads quota is exceeded., block move is failed

proxy异常:
 WARN balancer.Dispatcher: Failed to move blk_13904085291_12851796248 with size=412 from 10.116.101.126:50010:DISK to 10.116.101.227:50010:DISK through 10.116.100.51:50010: Got error, status message opReplaceBlock BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 received exception java.io.IOException: Got error, status message Not able to copy block 13904085291 to /10.116.101.227:42066 because threads quota is exceeded., copy block BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 from /10.116.100.51:50010, block move is failed

threads quota is exceeded 说明datanode上(target、proxy)上当前正在参与balance进行blockReplace(target)和blockCopy(proxy)的线程数量超过阀值(dfs.datanode.balance.max.concurrent.moves),所以block迁移失败。 

如果大量出现此类异常,那么balance的速度会很慢:

    直接原因:block迁移失败;

    间接原因:target和proxy会进入禁用期,导致可选proxy减少,进而src_block中的block不能及时被消费,也不能拉取新的blcok

balance慢的解决办法:
1、将datanode的迁移线程数dfs.datanode.balance.max.concurrent.moves增大。增大之后threads quota is exceeded 的问题会缓解,balance速度会加快。修改此参数需重启datanode。
2、将balancer的dfs.datanode.balance.max.concurrent.moves 增大。balancer上此值应该比datanode上的值稍微小一点,因为两个股进程存在状态不同步的可能。
3、将-threshold 增大。增大之后可以优先迁移使用率最高的datanode,待使用率将下来之后,再将threshold降低。
4、如果是多namespace的情况,则可以将-policy 由默认datanode改为Pool,在迁移时应根据namespace对应blockpool的使用率来评估datanode是否要balance。

08-11 11:13