hdfs Balancer剖析

balance過程就是從存儲使用率超出集羣平均使用率的datanode上將超出的block移動到低於集羣平均使用率的datanode上,最終知足平衡標準。
    over-utilized------>under-utilized
    over-utilized------>below-average
    above-average--->under-utilizedjava

1. over-utilized :使用率超出閥值的datanode
2. above-average : 使用率超出平均值,可是低於閥值的datanode
3. below-average : 使用率低於平均值,可是高於閥值的datanode
4. under-utilized: 使用率低於閥值的datanode
node

平衡標準:參與balance的每一個datanode的使用率 與 全部參與balance的全局的使用率 的差值 接近 給定的閥值thresholdapache

 

balance過程分多個迭代完成,每一個迭代開始時balancer按照上面的規則將全部參與balance的datanode分類、配對,造成<source,target>集合,一個<source,target>表示將source datanode上的部分block遷移到target datanode上。緩存

每一個<source,target>對應一個線程,該線程主要功能:
    一、從namenode上隨機拉取必定數量的block(每次最多2G,累計20G),篩選以後保存到src_block列表中。觸發拉取的條件之一:src_block列表中還沒有被遷移的block數量少於5(固定值,不可配)。
    二、將src_block列表中的block提交到線程池(線程池大小:dfs.balancer.moverThreads,默認1000)進行遷移。多線程

遷移過程使用多線程(線程數量:dfs.balancer.moverThreads)完成,每一個線程每次遷移一個block。
遷移過程:
    step1:從src_block列表中選擇一個block
    step2:檢查block的全部副本所在的datanode中是否有跟target同機架的datanode,若是有則選擇其做爲proxy,若是沒有則從中隨機選擇一個datanode做爲proxy。
    step3:balancer通知target 從proxy上將block複製一份到本機。
    step4:若是複製成功,則target 請求namenode將block從source上刪除,並反饋balancer。 
step1選擇block時,會檢查target,step2選擇proxy時,會檢查備選datanode,檢查的內容包括:
    一、當前正在執行復制的線程數量是否超過最大線程數(dfs.datanode.balance.max.concurrent.moves)
    二、是否在禁用期。每次複製失敗都會致使datanode 10秒鐘內禁止作爲proxy和target。
socket

核心方法、流程:oop

Balancer.runOneIteration()      //開始一輪
    Dispatcher.init();          //選擇參與balance的datanode
    Balancer.init();            //分組
    Balancer.chooseStorageGroups()         //構建<source, target>  優先匹配同機架、而後隨意搭配
    Dispatcher.dispatchAndCheckContinue()  //以source爲單位分發block移動任務
        Dispatcher.dispatchBlockMoves();   //每一個source啓動一個線程
            Source.dispatchBlocks();  
                Source.getBlockList();     //從namenode拉取總大小爲2G的block
                Source.chooseNextMove()    //每次選一個block放到線程池去移動
                    PendingMove.chooseBlockAndProxy(); //選擇一個block和一個proxy
                        PendingMove.markMovedIfGoodBlock();
                            PendingMove.isGoodBlockCandidate(); //是不是合適的block
                                PendingMove.chooseProxySource() //爲選擇的block選擇一個合適的proxy

datanode分類:ui

private long init(List<DatanodeStorageReport> reports) {
    // compute average utilization
    for (DatanodeStorageReport r : reports) {
      policy.accumulateSpaces(r);
    }
    policy.initAvgUtilization();

    // create network topology and classify utilization collections: 
    //   over-utilized, above-average, below-average and under-utilized.
    long overLoadedBytes = 0L, underLoadedBytes = 0L;
    for(DatanodeStorageReport r : reports) {
      final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
      for(StorageType t : StorageType.getMovableTypes()) {
      //全部參與balance的節點[lived 且包含在include中的節點]的使用率(已使用的存儲/總的存儲)
        final Double utilization = policy.getUtilization(r, t); 
        if (utilization == null) { // datanode does not have such storage type 
          continue;
        }
        
        final long capacity = getCapacity(r, t);
        final double utilizationDiff = utilization - policy.getAvgUtilization(t);
        //thresholdDiff越小說明該datanode越理想
        final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
        final long maxSize2Move = computeMaxSize2Move(capacity,
            getRemaining(r, t), utilizationDiff, threshold);

        final StorageGroup g;
        if (utilizationDiff > 0) {
          final Source s = dn.addSource(t, maxSize2Move, dispatcher);
          if (thresholdDiff <= 0) { // within threshold
            aboveAvgUtilized.add(s);
          } else {
            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
            overUtilized.add(s);
          }
          g = s;
        } else {
          g = dn.addTarget(t, maxSize2Move);
          if (thresholdDiff <= 0) { // within threshold
            belowAvgUtilized.add(g);
          } else {
            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
            underUtilized.add(g);
          }
        }
        dispatcher.getStorageGroupMap().put(g);
      }
    }

    logUtilizationCollections();
    
    Preconditions.checkState(dispatcher.getStorageGroupMap().size()
        == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
           + belowAvgUtilized.size(),
        "Mismatched number of storage groups");
    
    // return number of bytes to be moved in order to make the cluster balanced
    return Math.max(overLoadedBytes, underLoadedBytes);
  }

選擇proxy:this

private boolean chooseProxySource() {
      final DatanodeInfo targetDN = target.getDatanodeInfo();
      // if source and target are same nodes then no need of proxy
      if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
        return true;
      }
      // if node group is supported, first try add nodes in the same node group
      if (cluster.isNodeGroupAware()) {
        for (StorageGroup loc : block.getLocations()) {
          if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
              && addTo(loc)) {
            return true;
          }
        }
      }
      // check if there is replica which is on the same rack with the target
      //優選選擇副本所在機器跟target在相同rack的dn做爲proxy,同時須要改dn當前處理balance[發送block]的線程數量是否超標
      for (StorageGroup loc : block.getLocations()) {
        if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
          return true;
        }
      }
      // find out a non-busy replica
      //若是以上都沒有選擇出合適的proxy,那麼就選一個不忙的dn做爲proxy
      for (StorageGroup loc : block.getLocations()) {
        if (addTo(loc)) {
          return true;
        }
      }
      return false;
    }

block 遷移:spa

step1:balancer socket鏈接target,發起replaceBlock 請求,請求target從proxy上覆制一個block副本到本地來替換掉source上的副本。

step2:target向proxy 發起copyBlock請求,從proxy上將block副本複製到本地,複製完成後 target 經過notifyNamenodeReceivedBlock 方法生成一個ReceivedDeletedBlockInfo對象並緩存在隊列,下一次發起心跳的時候會據此對象通知namenode 將target上新加的block副本存入blockmap,並將source上對應的block 副本刪除

private void dispatch() {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Start moving " + this);
      }

      Socket sock = new Socket();
      DataOutputStream out = null;
      DataInputStream in = null;
      try {
        //balaner創建跟target的鏈接
        sock.connect(
            NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
            HdfsServerConstants.READ_TIMEOUT);

        sock.setKeepAlive(true);

        OutputStream unbufOut = sock.getOutputStream();
        InputStream unbufIn = sock.getInputStream();
        ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
            block.getBlock());
        final KeyManager km = nnc.getKeyManager(); 
        Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
            unbufIn, km, accessToken, target.getDatanodeInfo());
        unbufOut = saslStreams.out;
        unbufIn = saslStreams.in;
        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
            HdfsConstants.IO_FILE_BUFFER_SIZE));
        in = new DataInputStream(new BufferedInputStream(unbufIn,
            HdfsConstants.IO_FILE_BUFFER_SIZE));
        //向target發請求,命令其複製block副本
        sendRequest(out, eb, accessToken);
        receiveResponse(in);
        nnc.getBytesMoved().addAndGet(block.getNumBytes());
        LOG.info("Successfully moved " + this);
      } catch (IOException e) {
        LOG.warn("Failed to move " + this + ": " + e.getMessage());
        target.getDDatanode().setHasFailure();
        // Proxy or target may have some issues, delay before using these nodes
        // further in order to avoid a potential storm of "threads quota
        // exceeded" warnings when the dispatcher gets out of sync with work
        // going on in datanodes.
        //遷移失敗,多是由於proxy、target當前過於繁忙(同時處理blockReplace的操做太多),因此延遲其參與balance
        proxySource.activateDelay(delayAfterErrors);
        target.getDDatanode().activateDelay(delayAfterErrors);
      } finally {
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
        IOUtils.closeSocket(sock);
        //無論遷移成功仍是失敗,都將當前block從隊列中刪除
        proxySource.removePendingBlock(this);
        target.getDDatanode().removePendingBlock(this);

        synchronized (this) {
          reset();
        }
        synchronized (Dispatcher.this) {
          Dispatcher.this.notifyAll();
        }
      }
    }


 private void sendRequest(DataOutputStream out, ExtendedBlock eb,
        Token<BlockTokenIdentifier> accessToken) throws IOException {
      new Sender(out).replaceBlock(eb, target.storageType, accessToken,
          source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); //第4個參數是source 的uuid(在name上惟一標示一個datanode),用於通知namenode刪source上的副本
    }

target複製完成後的日誌:

2019-07-03 10:15:41,319 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Moved BP-691805646-10.116.100.3-1498296722300:blk_2306375028_1234536481 from /10.116.100.149:36907, delHint=7a29645c-cc12-44ce-b9c3-6642a82317c4

常見異常:

target異常:
WARN balancer.Dispatcher: Failed to move blk_10818540065_9759022625 with size=12178 from 10.116.100.126:50010:DISK to 10.116.100.143:50010:DISK through 10.116.102.93:50010: Got error, status message Not able to receive block 10818540065 from /10.116.100.149:57891 because threads quota is exceeded., block move is failed

proxy異常:
 WARN balancer.Dispatcher: Failed to move blk_13904085291_12851796248 with size=412 from 10.116.101.126:50010:DISK to 10.116.101.227:50010:DISK through 10.116.100.51:50010: Got error, status message opReplaceBlock BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 received exception java.io.IOException: Got error, status message Not able to copy block 13904085291 to /10.116.101.227:42066 because threads quota is exceeded., copy block BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 from /10.116.100.51:50010, block move is failed

threads quota is exceeded 說明datanode上(target、proxy)上當前正在參與balance進行blockReplace(target)和blockCopy(proxy)的線程數量超過閥值(dfs.datanode.balance.max.concurrent.moves),因此block遷移失敗。 

若是大量出現此類異常,那麼balance的速度會很慢:

    直接緣由:block遷移失敗;

    間接緣由:target和proxy會進入禁用期,致使可選proxy減小,進而src_block中的block不能及時被消費,也不能拉取新的blcok

balance慢的解決辦法: 一、將datanode的遷移線程數dfs.datanode.balance.max.concurrent.moves增大。增大以後threads quota is exceeded 的問題會緩解,balance速度會加快。修改此參數需重啓datanode。 二、將balancer的dfs.datanode.balance.max.concurrent.moves 增大。balancer上此值應該比datanode上的值稍微小一點,由於兩個股進程存在狀態不一樣步的可能。 三、將-threshold 增大。增大以後能夠優先遷移使用率最高的datanode,待使用率將下來以後,再將threshold下降。 四、若是是多namespace的狀況,則能夠將-policy 由默認datanode改成Pool,在遷移時應根據namespace對應blockpool的使用率來評估datanode是否要balance。

相關文章
相關標籤/搜索