源碼|HDFS之NameNode:啓動過程

仿照源碼|HDFS之DataNode:啓動過程,NameNode也從啓動過程開始。java

namenode的啓動過程與HA牢牢綁定在一塊兒,但本文暫不討論HA相關內容,之後再填HA的坑。node

源碼版本:Apache Hadoop 2.6.0git

可參考猴子追源碼時的速記打斷點,親自debug一遍。github

開始以前

總覽

namenode的啓動過程圍繞着safemode、HA等展開,啓動以後,各類工做線程開始發揮做用。主要包括:安全

  • 加載fsimage與editlog
  • 啓動多種工做線程,主要包括:
    • 通訊:RpcServer
    • 監控:JVMPauseMonitor、PendingReplicationMonitor、DecommissionManager#Monitor、HeartbeatManager#Monitor、ReplicationMonitor、LeaseManager#Monitor、NameNodeResourceMonitor
    • 其餘:HttpServer (Web UI)、NameNodeEditLogRoller
  • HA相關(暫不討論)
  • 關閉safemode

CacheManager#CacheReplicationMonitor等暫不討論。網絡

文件管理機制

namenode經過FSNamesystem管理文件元數據。具體來說:多線程

  • 經過FSDirectory管理文件系統的命名空間
  • 經過BlockManager管理文件到數據塊的映射和數據塊到數據節點的映射

namenode上的數據塊狀態

與yarn不一樣,hdfs並無直接用狀態機來管理block,而是將不一樣狀態的block存儲在不一樣的緩衝區中,狀態遷移則對應數據塊在不一樣緩衝區中的移動。包括BlockManager#blocksMap,namenode上的數據塊狀態共涉及如下幾種緩衝區:分佈式

  • 正在寫入的數據塊:經過LeaseManager掃描INodeFile,BlockInfo#isComplete()返回false即爲正在寫入的數據塊(不經常使用)
  • 存儲數據塊的元信息,能夠認爲存儲已完成等全部狀態的數據塊:BlockManager#blocksMap,BlockInfo#isComplete()返回true
  • 須要複製的數據塊:BlockManager#neededReplications
  • 正在複製的數據塊:BlockManager#pendingReplications
  • 複製超時的數據塊:BlockManager#pendingReplications#timedOutItems
  • 多餘的數據塊:BlockManager#excessReplicateMap(即須要刪除的數據塊)
  • 無效數據塊緩衝區BlockManager#invalidateBlocks(即正在刪除的數據塊)

BlockManager#excessReplicateMap與BlockManager#neededReplications對應,BlockManager#invalidateBlocks與BlockManager#pendingReplications對應,爲了統一,本文相應稱BlockManager#excessReplicateMap爲「須要刪除數據塊緩衝區」,稱BlockManager#invalidateBlocks爲「正在刪除數據塊緩衝區」。oop

其中,與數據塊(對應多個副本)寫入密切相關的LeaseManager;與副本複製任務密切相關的是BlockManager#neededReplications、BlockManager#pendingReplications、BlockManager#pendingReplications#timedOutItems,與副本刪除密切相關的是BlockManager#excessReplicateMap、BlockManager#invalidateBlocks。性能

詳細的狀態轉換邏輯在分析完源碼後,再來總結。

文章的組織結構

  1. 若是隻涉及單個分支的分析,則放在同一節。
  2. 若是涉及多個分支的分析,則在下一級分多個節,每節討論一個分支。
  3. 多線程的分析同多分支。
  4. 每個分支和線程的組織結構遵循規則1-3。

主流程

namenode的Main Class是NameNode,先找NameNode.main():

public static void main(String argv[]) throws Exception {
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      // 建立namenode
      NameNode namenode = createNameNode(argv, null);
      // 等待namenode關閉
      if (namenode != null) {
        namenode.join();
      }
    } catch (Throwable e) {
      LOG.fatal("Failed to start namenode.", e);
      terminate(1, e);
    }
  }
  
  ...
  
  public void join() {
    try {
      // 等待RPCServer關閉,其餘守護進程會自動關閉
      rpcServer.join();
    } catch (InterruptedException ie) {
      LOG.info("Caught interrupted exception ", ie);
    }
  }
複製代碼

NameNode#join()等待namenode關閉,基本邏輯同datanode。

主要看NameNode.createNameNode():

public static NameNode createNameNode(String argv[], Configuration conf) throws IOException {
    LOG.info("createNameNode " + Arrays.asList(argv));
    if (conf == null)
      conf = new HdfsConfiguration();
    // 解析啓動選項
    StartupOption startOpt = parseArguments(argv);
    if (startOpt == null) {
      printUsage(System.err);
      return null;
    }
    setStartupOption(conf, startOpt);

    switch (startOpt) {
      ...// 其餘分支
      default: {
        // 正常啓動的話
        // 初始化metric系統
        DefaultMetricsSystem.initialize("NameNode");
        // 建立NameNode
        return new NameNode(conf);
      }
    }
  }
複製代碼

正常啓動的話,知足startOpt == StartupOption.REGULAR,會走到default分支。

NameNode.<init>()

public NameNode(Configuration conf) throws IOException {
    this(conf, NamenodeRole.NAMENODE);
  }
  
  ...

  protected NameNode(Configuration conf, NamenodeRole role) throws IOException { 
    this.conf = conf;
    this.role = role;
    // 設置NameNode#clientNamenodeAddress爲"hdfs://localhost:9000"
    setClientNamenodeAddress(conf);
    String nsId = getNameServiceId(conf);
    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
    // HA相關
    this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
    state = createHAState(getStartupOption(conf));
    this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
    this.haContext = createHAContext();
    try {
      initializeGenericKeys(conf, nsId, namenodeId);
      // 完成實際的初始化工做
      initialize(conf);
      // HA相關
      try {
        haContext.writeLock();
        state.prepareToEnterState(haContext);
        state.enterState(haContext);
      } finally {
        haContext.writeUnlock();
      }
    } catch (IOException e) {
      this.stop();
      throw e;
    } catch (HadoopIllegalArgumentException e) {
      this.stop();
      throw e;
    }
  }
複製代碼

這裏要特別說明部分HA的內容:

儘管本地的僞分佈式集羣沒法開啓HA(對應NameNode#haEnabled爲false),namenode仍然擁有一個HAState,此時,namenode會被標記爲active(對應HAState ACTIVE_STATE = new ActiveState()),而後在ActiveState#enterState()中啓動LeaseManager#Monitor、NameNodeEditLogRoller等。

具體來說,在NameNode#initialize()完成實際的初始化工做返回後,還要執行ActiveState#enterState(),完成一些只有active狀態namenode才應該作的工做,如:

  • 打開FsDirectory的quota檢查
  • 啓動LeaseManager#Monitor
  • 啓動NameNodeResourceMonitor
  • 啓動NameNodeEditLogRoller
  • 啓動CacheManager#CacheReplicationMonitor等

後面會專門討論HA機制,讀者知道什麼時候啓動了這些工做線程便可。

下面繼續看NameNode#initialize():

protected void initialize(Configuration conf) throws IOException {
    if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
      String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
      if (intervals != null) {
        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
          intervals);
      }
    }

    UserGroupInformation.setConfiguration(conf);
    loginAsNameNodeUser(conf);

    // 初始化metric
    NameNode.initMetrics(conf, this.getRole());
    StartupProgressMetrics.register(startupProgress);

    // 啓動httpServer
    if (NamenodeRole.NAMENODE == role) {
      startHttpServer(conf);
    }

    this.spanReceiverHost = SpanReceiverHost.getInstance(conf);

    // 從`${dfs.namenode.name.dir}`目錄加載fsimage與editlog,初始化FsNamesystem、FsDirectory、LeaseManager等
    loadNamesystem(conf);

    // 建立RpcServer,封裝了NameNodeRpcServer#clientRpcServer,支持ClientNamenodeProtocol、DatanodeProtocolPB等協議
    rpcServer = createRpcServer(conf);
    if (clientNamenodeAddress == null) {
      // This is expected for MiniDFSCluster. Set it now using 
      // the RPC server's bind address.
      clientNamenodeAddress = 
          NetUtils.getHostPortString(rpcServer.getRpcAddress());
      LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
          + " this namenode/service.");
    }
    if (NamenodeRole.NAMENODE == role) {
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
    }
    
    // 啓動JvmPauseMonitor等,反向監控JVM
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
    
    // 啓動多項重要的工做線程
    startCommonServices(conf);
  }
複製代碼

實際上,NameNode#loadNamesystem()很是重要,但限於篇幅和精力,猴子只是大概追蹤了下流程,不少細節來不及分析,就不在此處展開了。

當前namenode的角色爲NamenodeRole.NAMENODE,則此處啓動HttpServer;JvmPauseMonitor也在此處啓動。

重頭戲是NameNode#startCommonServices():

private void startCommonServices(Configuration conf) throws IOException {
    // 建立NameNodeResourceChecker、激活BlockManager等
    namesystem.startCommonServices(conf, haContext);
    registerNNSMXBean();
    // 角色非`NamenodeRole.NAMENODE`的在此處啓動HttpServer
    if (NamenodeRole.NAMENODE != role) {
      startHttpServer(conf);
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
    }
    // 啓動RPCServer
    rpcServer.start();
    ...// 啓動各插件
    LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
    if (rpcServer.getServiceRpcAddress() != null) {
      LOG.info(getRole() + " service RPC up at: "
          + rpcServer.getServiceRpcAddress());
    }
  }
複製代碼

RPCServer的啓動很簡單,重點是FSNamesystem#startCommonServices():

void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
    this.registerMBean(); // register the MBean for the FSNamesystemState
    writeLock();
    this.haContext = haContext;
    try {
      // 建立NameNodeResourceChecker,並當即檢查一次
      nnResourceChecker = new NameNodeResourceChecker(conf);
      checkAvailableResources();
      assert safeMode != null && !isPopulatingReplQueues();
      // 設置一些啓動過程當中的信息
      StartupProgress prog = NameNode.getStartupProgress();
      prog.beginPhase(Phase.SAFEMODE);
      prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        getCompleteBlocksTotal());
      // 設置已完成的數據塊總量
      setBlockTotal();
      // 激活BlockManager
      blockManager.activate(conf);
    } finally {
      writeUnlock();
    }
    
    registerMXBean();
    DefaultMetricsSystem.instance().register(this);
    snapshotManager.registerMXBean();
  }
複製代碼

提醒一下,此時的工做是任何角色的namenode(對於HA來講,即active與standby)都須要作的,「common」即此意。

NameNodeResourceChecker負責檢查磁盤資源。active狀態的namenod會啓動一個監控線程NameNodeResourceMonitor,按期執行NameNodeResourceChecker#hasAvailableDiskSpace()檢查可用的磁盤資源。

FSNamesystem#setBlockTotal()設置已完成的數據塊總量completeBlocksTotal = blocksTotal - numUCBlocks。其中,blocksTotal來自BlockManager,numUCBlocks來自LeaseManager,均從備份中恢復而來。

注意,儘管此時BlockManager#blocksMap從備份中「恢復」了所有數據塊,但這些數據塊的BlockInfo#triplets是空的,由於datanode尚未將數據塊的信息彙報到namenode(猴子只啓動了namenode)。

BlockManager#activate()所謂的激活BlockManager,主要完成了PendingReplicationMonitor、DecommissionManager#Monitor、HeartbeatManager#Monitor、ReplicationMonitor等工做線程的啓動:

public void activate(Configuration conf) {
    // 啓動PendingReplicationMonitor
    pendingReplications.start();
    // 激活DatanodeManager:啓動DecommissionManager#Monitor、HeartbeatManager#Monitor
    datanodeManager.activate(conf);
    // 啓動BlockManager#ReplicationMonitor
    this.replicationThread.start();
  }
複製代碼

啓動PendingReplicationMonitor:PendingReplicationBlocks#start()

PendingReplicationBlocks#start()啓動PendingReplicationMonitor:

void start() {
    timerThread = new Daemon(new PendingReplicationMonitor());
    timerThread.start();
  }
複製代碼

經過PendingReplicationBlocks#timerThread持有實際的PendingReplicationMonitor線程。

PendingReplicationMonitor線程後文單獨用一個分支分析。

激活DatanodeManager:DatanodeManager#activate()

DatanodeManager#activate()啓動DecommissionManager#Monitor、HeartbeatManager#Monitor:

void activate(final Configuration conf) {
    final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
    this.decommissionthread = new Daemon(dm.new Monitor(
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 
                    DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, 
                    DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
    // 啓動DecommissionManager#Monitor
    decommissionthread.start();

    // 激活HeartbeatManager
    heartbeatManager.activate(conf);
  }
複製代碼

Decommission指datanode的下線操做,暫不關注。主要看HeartbeatManager相關。

HeartbeatManager#activate():

void activate(Configuration conf) {
    heartbeatThread.start();
  }
複製代碼

HeartbeatManager#heartbeatThread是一個HeartbeatManager#Monitor線程。

HeartbeatManager#Monitor線程後文單獨用一個分支分析。

啓動ReplicationMonitor

BlockManager#replicationThread是一個BlockManager#ReplicationMonitor線程。

BlockManager#ReplicationMonitor線程後文單獨用一個分支分析。

下面分別分析PendingReplicationMonitor線程、HeartbeatManager#Monitor線程、BlockManager#ReplicationMonitor線程,三者與namenode上數據塊的狀態管理密切相關。理解了這三個線程的交互關係,有助於之後理解其餘數據塊狀態轉換邏輯。

PendingReplicationMonitor線程

PendingReplicationMonitor#run():

public void run() {
      while (fsRunning) {
        long period = Math.min(DEFAULT_RECHECK_INTERVAL, timeout);
        try {
          // 檢查正在複製的數據塊是否超時
          pendingReplicationCheck();
          Thread.sleep(period);
        } catch (InterruptedException ie) {
          if(LOG.isDebugEnabled()) {
            LOG.debug("PendingReplicationMonitor thread is interrupted.", ie);
          }
        }
      }
    }
複製代碼

按期執行PendingReplicationMonitor#pendingReplicationCheck()檢查正在複製的數據塊是否超時:

void pendingReplicationCheck() {
      synchronized (pendingReplications) {
        Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
                                    pendingReplications.entrySet().iterator();
        long now = now();
        if(LOG.isDebugEnabled()) {
          LOG.debug("PendingReplicationMonitor checking Q");
        }
        // 遍歷`正在複製數據塊緩衝區`
        while (iter.hasNext()) {
          Map.Entry<Block, PendingBlockInfo> entry = iter.next();
          PendingBlockInfo pendingBlock = entry.getValue();
          // 若是數據塊超時,則將其添加到`複製超時數據塊緩衝區`
          if (now > pendingBlock.getTimeStamp() + timeout) {
            Block block = entry.getKey();
            synchronized (timedOutItems) {
              timedOutItems.add(block);
            }
            LOG.warn("PendingReplicationMonitor timed out " + block);
            iter.remove();
          }
        }
      }
    }
複製代碼

遍歷正在複製數據塊緩衝區BlockManager#pendingReplications若是發現有數據塊超時,則將數據塊添加到複製超時數據塊緩衝區BlockManager#pendingReplications#timedOutItems

超時指超過${dfs.namenode.replication.pending.timeout-sec}毫秒(儘管後綴是「sec」,但代碼邏輯是毫秒)仍沒有收到副本複製成功的響應。

HeartbeatManager#Monitor線程

HeartbeatManager#Monitor#run():

public void run() {
      while(namesystem.isRunning()) {
        try {
          final long now = Time.now();
          if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
            // 根據心跳信息檢查節點與數據目錄
            heartbeatCheck();
            lastHeartbeatCheck = now;
          }
          ...// 安全認證相關,暫時忽略
        } catch (Exception e) {
          LOG.error("Exception while checking heartbeat", e);
        }
        try {
          Thread.sleep(5000);  // 5 seconds
        } catch (InterruptedException ie) {
        }
      }
    }
複製代碼

按期執行HeartbeatManager#heartbeatCheck()根據心跳信息檢查節點與數據目錄:

void heartbeatCheck() {
    final DatanodeManager dm = blockManager.getDatanodeManager();
    // 若是處於啓動過程當中的safemode狀態,則不進行檢查(空檢查)
    if (namesystem.isInStartupSafeMode()) {
      return;
    }
    // 下述邏輯保證每次只移除一個datanode或數據目錄
    boolean allAlive = false;
    while (!allAlive) {
      // 記錄第一個死亡的datanode
      DatanodeID dead = null;
      // 記錄非死亡datanode上的第一個失敗數據目錄
      DatanodeStorageInfo failedStorage = null;

      // 檢查節點與數據目錄
      int numOfStaleNodes = 0;
      int numOfStaleStorages = 0;
      synchronized(this) {
        for (DatanodeDescriptor d : datanodes) {
          // 檢查節點是否死亡或不新鮮
          if (dead == null && dm.isDatanodeDead(d)) {
            stats.incrExpiredHeartbeats();
            dead = d;
          }
          if (d.isStale(dm.getStaleInterval())) {
            numOfStaleNodes++;
          }
          // 若是節點存儲且新鮮,則檢查節點上的全部數據目錄是否不新鮮或失敗
          DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
          for(DatanodeStorageInfo storageInfo : storageInfos) {
            if (storageInfo.areBlockContentsStale()) {
              numOfStaleStorages++;
            }

            if (failedStorage == null &&
                storageInfo.areBlocksOnFailedStorage() &&
                d != dead) {
              failedStorage = storageInfo;
            }
          }

        }
        
        dm.setNumStaleNodes(numOfStaleNodes);
        dm.setNumStaleStorages(numOfStaleStorages);
      }

      // 更新allAlive,若是所有經過了檢查,則退出循環
      allAlive = dead == null && failedStorage == null;
      if (dead != null) {
        ...// 移除第一個死亡數據節點及與其關聯的副本
      }
      if (failedStorage != null) {
        ...// 移除第一個失敗數據目錄關聯的副本
      }
    }
  }
複製代碼

在datanode、數據目錄兩個粒度進行檢查,以肯定其是否死亡或不新鮮。若是datanode死亡,則移除該datanode,且再也不須要刪除該datanode上的副本,從須要刪除數據塊緩衝區BlockManager#excessReplicateMap正在刪除數據塊緩衝區BlockManager#invalidateBlocks中移除相關副本。若是有數據目錄失敗,則只須要移除與其關聯的副本。

  • 不新鮮是介於存活與死亡之間的一個狀態。不新鮮與死亡狀態都經過心跳間隔判斷,閾值不一樣。
  • 猴子有時候說「副本」,有時候說「數據塊」,可能會讓讀者感到糊塗。與數據塊相比,副本與具體的某個datanode綁定。能夠理解爲,數據塊一個抽象概念,對應多個副本,每一個副本與一個datanode綁定

BlockManager#ReplicationMonitor線程

BlockManager#ReplicationMonitor#run():

public void run() {
      while (namesystem.isRunning()) {
        try {
          // Process replication work only when active NN is out of safe mode.
          if (namesystem.isPopulatingReplQueues()) {
            // 計算副本複製與副本刪除任務
            computeDatanodeWork();
            // 處理過時的副本複製任務
            processPendingReplications();
          }
          Thread.sleep(replicationRecheckInterval);
        } catch (Throwable t) {
          ...// 異常處理
        }
      }
    }
複製代碼

若是開啓了HA,則_當且僅當當前namenode處於active狀態,且safemode關閉的狀態下,FsNamesystem#isPopulatingReplQueues()纔會返回true_;不然返回false。

根據前文對NameNode.<init>()方法的分析,若是未開啓HA,namenode將被標記爲active狀態;啓動過程當中,將打開safemode。那麼,什麼時候纔會關閉safemode呢?等到彙報的數據塊的比例超過設置的閾值,就會關閉safemode,標誌着namenode啓動過程的完成。

如今啓動一個datenode(猴子以前向集羣上傳過幾個文件),等待第一次數據塊彙報完成後,繼續來分析BlockManager#computeDatanodeWork()與BlockManager#processPendingReplications()。

BlockManager#computeDatanodeWork()

BlockManager#computeDatanodeWork():

int computeDatanodeWork() {
    if (namesystem.isInSafeMode()) {
      return 0;
    }

    // 流控相關
    final int numlive = heartbeatManager.getLiveDatanodeCount();
    final int blocksToProcess = numlive
        * this.blocksReplWorkMultiplier;
    final int nodesToProcess = (int) Math.ceil(numlive
        * this.blocksInvalidateWorkPct);

    // 計算可進行副本複製的任務,最後返回任務數
    int workFound = this.computeReplicationWork(blocksToProcess);

    // 更新狀態等
    namesystem.writeLock();
    try {
      this.updateState();
      this.scheduledReplicationBlocksCount = workFound;
    } finally {
      namesystem.writeUnlock();
    }
    // 計算可進行副本刪除的任務,最後返回任務數
    workFound += this.computeInvalidateWork(nodesToProcess);
    // 返回總任務數
    return workFound;
  }
複製代碼

BlockManager#computeReplicationWork()遍歷須要複製數據塊緩衝區BlockManager#neededReplications,計算可進行副本複製的任務,放入正在複製數據塊緩衝區BlockManager#pendingReplications,最後返回任務數。

BlockManager#computeInvalidateWork()遍歷須要刪除數據塊緩衝區BlockManager#excessReplicateMap,計算可進行副本刪除的任務,放入正在刪除數據塊緩衝區BlockManager#invalidateBlocks,最後返回任務數。

namenode對副本複製和副本刪除作了一些簡單的「流控」(將工做理解爲網絡流量,對工做數的控制):

  • 對副本複製:限制每批次進行副本複製的數據塊總數,最多爲集羣存活datanode總數的${dfs.namenode.replication.work.multiplier.per.iteration}倍,默認爲2。
  • 對副本刪除:限制每批次進行副本刪除涉及的datanode總數,最多爲集羣存活datanode總數的${dfs.namenode.invalidate.work.pct.per.iteration}倍,默認爲32%;限制每批次涉及的每一個datanode上刪除的副本總數,最多爲${dfs.block.invalidate.limit},默認爲1000。

例如集羣有1000個存活節點,使用默認參數,則每批次最多建立1000 * 2 = 2000個數據塊的副本複製工做,最多建立1000 * 32% * 1000 = 32w個副本。

能夠看到,副本複製的任務數上限遠大於副本刪除。這是由於,副本複製須要在datanode之間複製數據塊,佔用大量網絡資源,若是不限制同時進行的副本複製任務數,很容易形成網絡擁塞,影響整個集羣的性能;而副本刪除只涉及datanode內部的操做,甚至,對於大部分操做系統而言,文件remove操做只須要操做相似文件分配表(File Allocation table,FAT)的結構,成本很是小。

BlockManager#processPendingReplications()

BlockManager#processPendingReplications():

private void processPendingReplications() {
    // 獲取所有的複製超時數據塊,並清空複製超時數據塊緩衝區
    Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
    if (timedOutItems != null) {
      namesystem.writeLock();
      try {
        for (int i = 0; i < timedOutItems.length; i++) {
          // 計算各狀態的副本數
          NumberReplicas num = countNodes(timedOutItems[i]);
          // 若是過時的待複製數據塊仍然須要被複制,則將其添加回須要複製數據塊緩衝區
          if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
                                 num.liveReplicas())) {
            neededReplications.add(timedOutItems[i],
                                   num.liveReplicas(),
                                   num.decommissionedReplicas(),
                                   getReplication(timedOutItems[i]));
          }
        }
      } finally {
        namesystem.writeUnlock();
      }
    }
  }
  
  ...
  
  private boolean isNeededReplication(Block b, int expected, int current) {
    // 若是當前的存活副本數小於副本系數,或數據塊沒有足夠的機架分佈,就須要繼續複製
    return current < expected || !blockHasEnoughRacks(b);
  }
複製代碼

BlockManager#processPendingReplications()每次都_從複製超時數據塊緩衝區BlockManager#pendingReplications#timedOutItems中取出所有的複製超時數據塊,若是這些數據塊還須要被複制,則將其從新加入須要複製數據塊緩衝區BlockManager#neededReplications_。等待BlockManager#ReplicationMonitor線程在下一批次計算這些副本複製任務。

至此,namenode啓動流程的源碼已經走完。

總結

datanode的主要責任是數據塊(文件內容)的讀寫,所以,datanode的啓動流程主要關注的是與客戶端、namenode通訊的工做線程,底層的存儲管理機制等。

namenode的主要責任是文件元信息與數據塊映射的管理。相應的,namenode的啓動流程須要關注與客戶端、datanode通訊的工做線程,文件元信息的管理機制,數據塊的管理機制等。其中,RpcServer主要負責與客戶端、datanode通訊,FSDirectory主要負責管理文件元信息,兩者中的難點分別爲RPC機制和命名空間備份機制,本文簡單說起,沒有深刻。重點筆墨放在了namenode對數據塊的管理上,即namenode上數據塊的關鍵狀態轉換

image.png

上圖使用dot語言 + graphiz + sublime text 3 + graphiz-prefiew製做,簡潔靈活逼格高,今後愛上狀態機。

解釋一下上圖:

  • 開始狀態爲complete(由於咱們尚未分析namenode上的寫數據塊流程),結束狀態爲completenone
  • none表示刪除後的狀態。
  • none狀態外,各狀態對應着總覽中的各緩衝區。

紅線組成了副本複製的關鍵流程;藍線組成了副本刪除的關鍵流程。與副本複製流程相比,副本刪除流程不須要區分timeout狀態的數據塊(相似複製超時數據塊緩衝區BlockManager#pendingReplications#timedOutItems),更不須要區分刪除失敗等狀態。這是由於,副本刪除命令被髮出後,namenode即認爲副本刪除成功若是實際上刪除失敗(超時等緣由),datanode必然會再次彙報目標數據塊,namenode發現已經有足夠的存活副本,則將目標數據塊再次加入須要刪除數據塊緩衝區BlockManager#excessReplicateMap,即數據塊再次轉爲excess_replicate狀態

此處並非完整的NNBlock狀態機(仿照Yarn中RMApp的命名),隨着之後的分析,還要引入「數據塊正在寫」、「數據塊損壞」等狀態。

吐槽

最近狀態很差,近一週看不下去源碼。看源碼太缺乏成就感,時間長了實在熬人,,,有沒有建議?


本文連接:源碼|HDFS之NameNode:啓動過程
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。

相關文章
相關標籤/搜索