【原創】大數據基礎之HDFS(2)HDFS副本數量檢查及複製邏輯

HDFS會週期性的檢查是否有文件缺乏副本,並觸發副本複製邏輯使之達到配置的副本數,node

  <property>apache

    <name>dfs.replication</name>less

    <value>3</value>ide

  </property>oop

具體實現是在BlockManager中啓動線程ReplicationMonitor完成:this

org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerspa

  /**
   * Periodically calls computeReplicationWork().
   */
  private class ReplicationMonitor implements Runnable {

    @Override
    public void run() {
      while (namesystem.isRunning()) {
        try {
          // Process replication work only when active NN is out of safe mode.
          if (namesystem.isPopulatingReplQueues()) {
            computeDatanodeWork();
            processPendingReplications();
          }
          Thread.sleep(replicationRecheckInterval);
        } catch (Throwable t) {

註釋:sleep間隔replicationRecheckInterval取配置dfs.namenode.replication.interval,默認爲3,即3s線程

 

  /**
   * Compute block replication and block invalidation work that can be scheduled
   * on data-nodes. The datanode will be informed of this work at the next
   * heartbeat.
   * 
   * @return number of blocks scheduled for replication or removal.
   */
  int computeDatanodeWork() {
    // Blocks should not be replicated or removed if in safe mode.
    // It's OK to check safe mode here w/o holding lock, in the worst
    // case extra replications will be scheduled, and these will get
    // fixed up later.
    if (namesystem.isInSafeMode()) {
      return 0;
    }

    final int numlive = heartbeatManager.getLiveDatanodeCount();
    final int blocksToProcess = numlive
        * this.blocksReplWorkMultiplier;
    final int nodesToProcess = (int) Math.ceil(numlive
        * this.blocksInvalidateWorkPct);

    int workFound = this.computeReplicationWork(blocksToProcess);

註釋:倍數blocksReplWorkMultiplier取配置dfs.namenode.replication.work.multiplier.per.iteration,默認爲2,即每次處理datanode數量*2個block;code

 

  /**
   * Scan blocks in {@link #neededReplications} and assign replication
   * work to data-nodes they belong to.
   *
   * The number of process blocks equals either twice the number of live
   * data-nodes or the number of under-replicated blocks whichever is less.
   *
   * @return number of blocks scheduled for replication during this iteration.
   */
  int computeReplicationWork(int blocksToProcess) {
    List<List<Block>> blocksToReplicate = null;
    namesystem.writeLock();
    try {
      // Choose the blocks to be replicated
      blocksToReplicate = neededReplications
          .chooseUnderReplicatedBlocks(blocksToProcess);
    } finally {
      namesystem.writeUnlock();
    }
    return computeReplicationWorkForBlocks(blocksToReplicate);
  }

  int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
...
          // Add block to the to be replicated list
          rw.srcNode.addBlockToBeReplicated(block, targets);
          scheduledWork++;

註釋:具體的處理過程是將待複製block添加到對應的原始datanode上;orm

 

下面看DatanodeManager代碼:

org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager

  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, final String blockPoolId,
      long cacheCapacity, long cacheUsed, int xceiverCount, 
      int maxTransfers, int failedVolumes
      ) throws IOException {
...
        final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
        //check pending replication
        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
              maxTransfers);
        if (pendingList != null) {
          cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
              pendingList));
        }

註釋:而後在DatanodeManager中處理心跳時將複製block信息發給對應的原始datanode;其中maxTransfer取值爲

      final int maxTransfer = blockManager.getMaxReplicationStreams()
          - xmitsInProgress;

getMaxReplicationStreams取配置dfs.namenode.replication.max-streams,默認是2,即一個datanode同時最多有2個block在複製;

相關文章
相關標籤/搜索