體會 Hadoop 數據節點升級機制中的設計之美

1、前言

Hadoop數據節點的升級機制,深刻了解下發現設計很是的優美,在此分享給你們。升級機制最重要的部分就是升級過程當中的故障恢復。咱們來看下它是怎麼被解決的。java

關鍵點:node

  1. 升級過程生成臨時目錄,標識中間狀態
  2. 啓動時分析出當前數據節點的存儲空間狀態
  3. 根據存儲空間狀態執行相應的操做使數據節點恢復正常

接下來咱們按關鍵點分析,看看它究竟是怎麼實現的。其中涉及到很多 Hadoop 源代碼的分析,關鍵代碼我會貼出來,若是你們想看完整的能夠自行下載瀏覽。apache

 

2、數據節點的升級

數據節點的升級包含三個很是重要的步驟:​​​​​​升級、升級提交和升級回滾。咱們先看下他們之間的聯繫。函數

  • 升級

下面代碼在 Hadoop 1.0.0 版本下的 DataStorage.java 文件中。文件路徑爲:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\datanode。oop

void doUpgrade(StorageDirectory sd,
                 NamespaceInfo nsInfo
                 ) throws IOException {
    ...
    // enable hardlink stats via hardLink object instance
    HardLink hardLink = new HardLink();
    
    File curDir = sd.getCurrentDir();
    File prevDir = sd.getPreviousDir();
    assert curDir.exists() : "Current directory must exist.";
    // delete previous dir before upgrading
    if (prevDir.exists())
      deleteDir(prevDir);
    File tmpDir = sd.getPreviousTmp();
    assert !tmpDir.exists() : "previous.tmp directory must not exist.";
    // rename current to tmp
    rename(curDir, tmpDir);
    // hardlink blocks
    linkBlocks(tmpDir, curDir, this.getLayoutVersion(), hardLink);
    // write version file
    this.layoutVersion = FSConstants.LAYOUT_VERSION;
    assert this.namespaceID == nsInfo.getNamespaceID() :
      "Data-node and name-node layout versions must be the same.";
    this.cTime = nsInfo.getCTime();
    sd.write();
    // rename tmp to previous
    rename(tmpDir, prevDir);
    ...
  }

上面的代碼並非特別難。咱們看最關鍵的幾個步驟:this

  1. 將數據節點當前數據的存儲目錄(current) 更名爲臨時目錄(previous.tmp)
  2. 生成新的 current 目錄,並將當前目錄下全部文件使用硬連接的模式連接到 previous.tmp 目錄中的文件
  3. 將 VERSION 文件寫入到 current 目錄
  4. 將臨時目錄(previous.tmp)更名爲 previous 目錄

經過以上幾個步驟咱們發如今升級過程當中作的操做並不複雜,咱們關鍵要注意臨時目錄(previous.tmp),由於它是後續判斷升級是否異常的關鍵依據。spa

  • 升級提交

咱們再來看下升級提交的代碼,代碼仍然在 DataStorage.java 文件中。線程

void doFinalize(StorageDirectory sd) throws IOException {
    File prevDir = sd.getPreviousDir();
    if (!prevDir.exists())
      return; // already discarded
    final String dataDirPath = sd.getRoot().getCanonicalPath();
    ...
    assert sd.getCurrentDir().exists() : "Current directory must exist.";
    final File tmpDir = sd.getFinalizedTmp();
    // rename previous to tmp
    rename(prevDir, tmpDir);

    // delete tmp dir in a separate thread
    new Daemon(new Runnable() {
        public void run() {
          try {
            deleteDir(tmpDir);
          } catch(IOException ex) {
            LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
          }
          LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
        }
        public String toString() { return "Finalize " + dataDirPath; }
      }).start();
  }

經過代碼發現,升級提交流程很是的簡單,只有關鍵的兩步:設計

  1. 將 previous 目錄重命名爲 臨時目錄(finalized.tmp)
  2. 將臨時目錄(finalized.tmp)利用單獨線程刪除

經過上面咱們發現即便是簡單的將一個目錄刪除 Hadoop 也分了兩步,而且還須要一個臨時目錄做爲中間狀態code

  • 升級回滾

最後咱們再來看下升級回滾的代碼,一樣在 DataStorage.java 文件中。

void doRollback( StorageDirectory sd,
                   NamespaceInfo nsInfo
                   ) throws IOException {
    File prevDir = sd.getPreviousDir();
    // regular startup if previous dir does not exist
    if (!prevDir.exists())
      return;
    DataStorage prevInfo = new DataStorage();
    StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
    prevSD.read(prevSD.getPreviousVersionFile());

    // We allow rollback to a state, which is either consistent with
    // the namespace state or can be further upgraded to it.
    if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
          && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback
      throw new InconsistentFSStateException(prevSD.getRoot(),
                                             "Cannot rollback to a newer state.\nDatanode previous state: LV = " 
                                             + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
                                             + " is newer than the namespace state: LV = "
                                             + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
    ...
    File tmpDir = sd.getRemovedTmp();
    assert !tmpDir.exists() : "removed.tmp directory must not exist.";
    // rename current to tmp
    File curDir = sd.getCurrentDir();
    assert curDir.exists() : "Current directory must exist.";
    rename(curDir, tmpDir);
    // rename previous to current
    rename(prevDir, curDir);
    // delete tmp dir
    deleteDir(tmpDir);
    LOG.info("Rollback of " + sd.getRoot() + " is complete.");
  }

一樣咱們來分析下上面的代碼,找出其中的關鍵步驟:

  1. 根據版本信息判斷是否能夠進行回滾操做
  2. 將當前目錄(current)重命名爲臨時目錄(removed.tmp)
  3. 將以前一個版本的目錄(previous)重命名爲當前目錄(current)
  4. 刪除臨時目錄(removed.tmp)

咱們發現回滾操做也利用了臨時文件夾(removed.tmp)。

經過上面的分析咱們已經清晰的知道數據節點升級的三個關鍵操做。接下來咱們繼續瞭解當在升級過程當中發生異常時,數據節點又是如何處理的。

 

3、分析存儲空間狀態

經過上面的分析咱們發現每一個步驟都會生成像 "previous.tmp、removed.tmp"這樣的臨時目錄,它們的做用是什麼呢?

咱們發如今執行升級、回滾等操做時都須要進行必定的操做,若是在作這些操做的時候設備出現故障(如斷電)那麼存儲空間就會處於一箇中間狀態。引入上述的這些臨時目錄就能判斷異常發生在什麼操做的什麼狀態,這樣就會方便後續的故障恢復。

數據節點在啓動的時候會對當前節點的存儲空間進行分析,得出存儲空間的狀態,而後根據不一樣的狀態執行不一樣的操做。若是發現分析出的狀態不是正常狀態,存在中間狀態或異常狀態(例如:發現升級過程當中的臨時目錄),則啓動 recovery 進行恢復。

咱們看下這部分的關鍵代碼,這部分代碼仍然在 DataStorage.java 文件中。

void recoverTransitionRead(NamespaceInfo nsInfo,
                             Collection<File> dataDirs,
                             StartupOption startOpt
                             ) throws IOException {
    ...
    for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
      File dataDir = it.next();
      StorageDirectory sd = new StorageDirectory(dataDir);
      StorageState curState;
      try {
        curState = sd.analyzeStorage(startOpt);      // 分析當前存儲狀態
        // sd is locked but not opened
        switch(curState) {
        case NORMAL:
          break;
        case NON_EXISTENT:
          // ignore this storage
          LOG.info("Storage directory " + dataDir + " does not exist.");
          it.remove();
          continue;
        case NOT_FORMATTED: // format
          LOG.info("Storage directory " + dataDir + " is not formatted.");
          LOG.info("Formatting ...");
          format(sd, nsInfo);
          break;
        default:  // recovery part is common
          sd.doRecover(curState);                    // 發現中間或異常狀態,進行恢復
        }
      } catch (IOException ioe) {
    ...
  }

明白了這個流程,接下來咱們就深刻了解下 analyzeStorage 的具體內容和總共有多少種狀態。

  • 存儲空間狀態

經過代碼咱們發現存儲空間的狀態總共有如下幾種:

public enum StorageState {
    NON_EXISTENT,
    NOT_FORMATTED,
    COMPLETE_UPGRADE,
    RECOVER_UPGRADE,
    COMPLETE_FINALIZE,
    COMPLETE_ROLLBACK,
    RECOVER_ROLLBACK,
    COMPLETE_CHECKPOINT,
    RECOVER_CHECKPOINT,
    NORMAL;
  }
  ...
  // Startup options
  static public enum StartupOption{
    FORMAT  ("-format"),   //格式化系統
    REGULAR ("-regular"),  //正常啓動HDFS
    UPGRADE ("-upgrade"),  //升級系統
    ROLLBACK("-rollback"), //從升級中回滾到前一個版本
    FINALIZE("-finalize"), //提交一次升級
    IMPORT  ("-importCheckpoint");// 從名字節點的一個檢查點恢復
    ...
  }

咱們發現上面提到的全部存儲空間狀態和當前施加在存儲空間的動做相關。其中只有部分是和升級相關的。

  • 分析出當前存儲空間狀態

接下來咱們好好研究下這些狀態是怎麼獲得的?

要回答上面咱們提出的問題,就必須好看下狀態分析函數:analyzeStorage。這函數在 Storage.java 文件中。路徑爲:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\common。

public StorageState analyzeStorage(StartupOption startOpt) throws IOException {
      ...
      // check whether current directory is valid
      File versionFile = getVersionFile();
      boolean hasCurrent = versionFile.exists();

      // check which directories exist
      boolean hasPrevious = getPreviousDir().exists();
      boolean hasPreviousTmp = getPreviousTmp().exists();
      boolean hasRemovedTmp = getRemovedTmp().exists();
      boolean hasFinalizedTmp = getFinalizedTmp().exists();
      boolean hasCheckpointTmp = getLastCheckpointTmp().exists();

      if (!(hasPreviousTmp || hasRemovedTmp
          || hasFinalizedTmp || hasCheckpointTmp)) {
        // no temp dirs - no recovery
        if (hasCurrent)
          return StorageState.NORMAL;
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                              "version file in current directory is missing.");
        return StorageState.NOT_FORMATTED;
      }

      if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0)
          + (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1)
        // more than one temp dirs
        throw new InconsistentFSStateException(root,
                                               "too many temporary directories.");

      // # of temp dirs == 1 should either recover or complete a transition
      if (hasCheckpointTmp) {
        return hasCurrent ? StorageState.COMPLETE_CHECKPOINT
                          : StorageState.RECOVER_CHECKPOINT;
      }

      if (hasFinalizedTmp) {
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                                                 STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED
                                                 + "cannot exist together.");
        return StorageState.COMPLETE_FINALIZE;
      }

      if (hasPreviousTmp) {
        if (hasPrevious)
          throw new InconsistentFSStateException(root,
                                                 STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS
                                                 + " cannot exist together.");
        if (hasCurrent)
          return StorageState.COMPLETE_UPGRADE;
        return StorageState.RECOVER_UPGRADE;
      }
      
      assert hasRemovedTmp : "hasRemovedTmp must be true";
      if (!(hasCurrent ^ hasPrevious))
        throw new InconsistentFSStateException(root,
                                               "one and only one directory " + STORAGE_DIR_CURRENT 
                                               + " or " + STORAGE_DIR_PREVIOUS 
                                               + " must be present when " + STORAGE_TMP_REMOVED
                                               + " exists.");
      if (hasCurrent)
        return StorageState.COMPLETE_ROLLBACK;
      return StorageState.RECOVER_ROLLBACK;
    }

經過上面的代碼咱們就能夠清晰的知道每一個存儲空間狀態是如何獲得的。這裏咱們重點解釋下 COMPLETE_UPGRADE 和 RECOVER_UPGRADE 這兩個存儲空間狀態。

  1. COMPLETE_UPGRADE:「previous.tmp」目錄存在,同時「current」目錄下的「VERSION」文件存在,可完成升級
  2. RECOVER_UPGRADE:「previous.tmp」目錄存在,「current/VERSION」文件不存在,存儲空間應該從升級過程當中恢復

其餘的狀態就不作一一分析,你們根據代碼應該很容易就能夠得出各個狀態成立的條件。

 

4、根據存儲空間狀態進行恢復

獲得了分析後的存儲空間狀態,咱們就能夠根據不一樣的狀態將存儲系統恢復正常。咱們來看下這部分代碼,這部分代碼 仍然在 Storage.java 文件中。

public void doRecover(StorageState curState) throws IOException {
      File curDir = getCurrentDir();
      String rootPath = root.getCanonicalPath();
      switch(curState) {
      case COMPLETE_UPGRADE:  // mv previous.tmp -> previous
        LOG.info("Completing previous upgrade for storage directory " 
                 + rootPath + ".");
        rename(getPreviousTmp(), getPreviousDir());
        return;
      case RECOVER_UPGRADE:   // mv previous.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from previous upgrade.");
        if (curDir.exists())
          deleteDir(curDir);
        rename(getPreviousTmp(), curDir);
        return;
      case COMPLETE_ROLLBACK: // rm removed.tmp
        LOG.info("Completing previous rollback for storage directory "
                 + rootPath + ".");
        deleteDir(getRemovedTmp());
        return;
      case RECOVER_ROLLBACK:  // mv removed.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from previous rollback.");
        rename(getRemovedTmp(), curDir);
        return;
      case COMPLETE_FINALIZE: // rm finalized.tmp
        LOG.info("Completing previous finalize for storage directory "
                 + rootPath + ".");
        deleteDir(getFinalizedTmp());
        return;
      case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
        LOG.info("Completing previous checkpoint for storage directory " 
                 + rootPath + ".");
        File prevCkptDir = getPreviousCheckpoint();
        if (prevCkptDir.exists())
          deleteDir(prevCkptDir);
        rename(getLastCheckpointTmp(), prevCkptDir);
        return;
      case RECOVER_CHECKPOINT:  // mv lastcheckpoint.tmp -> current
        LOG.info("Recovering storage directory " + rootPath
                 + " from failed checkpoint.");
        if (curDir.exists())
          deleteDir(curDir);
        rename(getLastCheckpointTmp(), curDir);
        return;
      default:
        throw new IOException("Unexpected FS state: " + curState);
      }
    }

這部分代碼很是容易理解,就是根據不一樣的狀態執行不一樣的操做。咱們仍然只分析兩個關鍵的狀態 COMPLETE_UPGRADE 和 RECOVER_UPGRADE 對應的操做。

  1. COMPLETE_UPGRADE:將臨時目錄「previous.tmp」更名爲「previous」
  2. RECOVER_UPGRADE:將臨時目錄「previous.tmp」更名爲「current」

這兩個狀態對應的操做也很好理解,分別是完成未完成的升級操做和回退未完成的升級操做。

5、總結

以上升級機制的關鍵就在於在作各類升級操做的時候很好的利用臨時文件夾,方便後續分析當前存儲空間狀態和對異常狀況進行恢復。

同時也很好的利用了存儲空間的狀態機機制,這樣很好的下降了各類狀態之間的耦合性。

關鍵點:

  • 臨時文件夾
  • 存儲空間狀態機
相關文章
相關標籤/搜索