Hadoop數據節點的升級機制,深刻了解下發現設計很是的優美,在此分享給你們。升級機制最重要的部分就是升級過程當中的故障恢復。咱們來看下它是怎麼被解決的。java
關鍵點:node
接下來咱們按關鍵點分析,看看它究竟是怎麼實現的。其中涉及到很多 Hadoop 源代碼的分析,關鍵代碼我會貼出來,若是你們想看完整的能夠自行下載瀏覽。apache
數據節點的升級包含三個很是重要的步驟:升級、升級提交和升級回滾。咱們先看下他們之間的聯繫。函數
下面代碼在 Hadoop 1.0.0 版本下的 DataStorage.java 文件中。文件路徑爲:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\datanode。oop
void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo ) throws IOException { ... // enable hardlink stats via hardLink object instance HardLink hardLink = new HardLink(); File curDir = sd.getCurrentDir(); File prevDir = sd.getPreviousDir(); assert curDir.exists() : "Current directory must exist."; // delete previous dir before upgrading if (prevDir.exists()) deleteDir(prevDir); File tmpDir = sd.getPreviousTmp(); assert !tmpDir.exists() : "previous.tmp directory must not exist."; // rename current to tmp rename(curDir, tmpDir); // hardlink blocks linkBlocks(tmpDir, curDir, this.getLayoutVersion(), hardLink); // write version file this.layoutVersion = FSConstants.LAYOUT_VERSION; assert this.namespaceID == nsInfo.getNamespaceID() : "Data-node and name-node layout versions must be the same."; this.cTime = nsInfo.getCTime(); sd.write(); // rename tmp to previous rename(tmpDir, prevDir); ... }
上面的代碼並非特別難。咱們看最關鍵的幾個步驟:this
經過以上幾個步驟咱們發如今升級過程當中作的操做並不複雜,咱們關鍵要注意臨時目錄(previous.tmp),由於它是後續判斷升級是否異常的關鍵依據。spa
咱們再來看下升級提交的代碼,代碼仍然在 DataStorage.java 文件中。線程
void doFinalize(StorageDirectory sd) throws IOException { File prevDir = sd.getPreviousDir(); if (!prevDir.exists()) return; // already discarded final String dataDirPath = sd.getRoot().getCanonicalPath(); ... assert sd.getCurrentDir().exists() : "Current directory must exist."; final File tmpDir = sd.getFinalizedTmp(); // rename previous to tmp rename(prevDir, tmpDir); // delete tmp dir in a separate thread new Daemon(new Runnable() { public void run() { try { deleteDir(tmpDir); } catch(IOException ex) { LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex); } LOG.info("Finalize upgrade for " + dataDirPath + " is complete."); } public String toString() { return "Finalize " + dataDirPath; } }).start(); }
經過代碼發現,升級提交流程很是的簡單,只有關鍵的兩步:設計
經過上面咱們發現即便是簡單的將一個目錄刪除 Hadoop 也分了兩步,而且還須要一個臨時目錄做爲中間狀態code
最後咱們再來看下升級回滾的代碼,一樣在 DataStorage.java 文件中。
void doRollback( StorageDirectory sd, NamespaceInfo nsInfo ) throws IOException { File prevDir = sd.getPreviousDir(); // regular startup if previous dir does not exist if (!prevDir.exists()) return; DataStorage prevInfo = new DataStorage(); StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot()); prevSD.read(prevSD.getPreviousVersionFile()); // We allow rollback to a state, which is either consistent with // the namespace state or can be further upgraded to it. if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION && prevInfo.getCTime() <= nsInfo.getCTime())) // cannot rollback throw new InconsistentFSStateException(prevSD.getRoot(), "Cannot rollback to a newer state.\nDatanode previous state: LV = " + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() + " is newer than the namespace state: LV = " + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime()); ... File tmpDir = sd.getRemovedTmp(); assert !tmpDir.exists() : "removed.tmp directory must not exist."; // rename current to tmp File curDir = sd.getCurrentDir(); assert curDir.exists() : "Current directory must exist."; rename(curDir, tmpDir); // rename previous to current rename(prevDir, curDir); // delete tmp dir deleteDir(tmpDir); LOG.info("Rollback of " + sd.getRoot() + " is complete."); }
一樣咱們來分析下上面的代碼,找出其中的關鍵步驟:
咱們發現回滾操做也利用了臨時文件夾(removed.tmp)。
經過上面的分析咱們已經清晰的知道數據節點升級的三個關鍵操做。接下來咱們繼續瞭解當在升級過程當中發生異常時,數據節點又是如何處理的。
經過上面的分析咱們發現每一個步驟都會生成像 "previous.tmp、removed.tmp"這樣的臨時目錄,它們的做用是什麼呢?
咱們發如今執行升級、回滾等操做時都須要進行必定的操做,若是在作這些操做的時候設備出現故障(如斷電)那麼存儲空間就會處於一箇中間狀態。引入上述的這些臨時目錄就能判斷異常發生在什麼操做的什麼狀態,這樣就會方便後續的故障恢復。
數據節點在啓動的時候會對當前節點的存儲空間進行分析,得出存儲空間的狀態,而後根據不一樣的狀態執行不一樣的操做。若是發現分析出的狀態不是正常狀態,存在中間狀態或異常狀態(例如:發現升級過程當中的臨時目錄),則啓動 recovery 進行恢復。
咱們看下這部分的關鍵代碼,這部分代碼仍然在 DataStorage.java 文件中。
void recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt ) throws IOException { ... for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next(); StorageDirectory sd = new StorageDirectory(dataDir); StorageState curState; try { curState = sd.analyzeStorage(startOpt); // 分析當前存儲狀態 // sd is locked but not opened switch(curState) { case NORMAL: break; case NON_EXISTENT: // ignore this storage LOG.info("Storage directory " + dataDir + " does not exist."); it.remove(); continue; case NOT_FORMATTED: // format LOG.info("Storage directory " + dataDir + " is not formatted."); LOG.info("Formatting ..."); format(sd, nsInfo); break; default: // recovery part is common sd.doRecover(curState); // 發現中間或異常狀態,進行恢復 } } catch (IOException ioe) { ... }
明白了這個流程,接下來咱們就深刻了解下 analyzeStorage 的具體內容和總共有多少種狀態。
經過代碼咱們發現存儲空間的狀態總共有如下幾種:
public enum StorageState { NON_EXISTENT, NOT_FORMATTED, COMPLETE_UPGRADE, RECOVER_UPGRADE, COMPLETE_FINALIZE, COMPLETE_ROLLBACK, RECOVER_ROLLBACK, COMPLETE_CHECKPOINT, RECOVER_CHECKPOINT, NORMAL; } ... // Startup options static public enum StartupOption{ FORMAT ("-format"), //格式化系統 REGULAR ("-regular"), //正常啓動HDFS UPGRADE ("-upgrade"), //升級系統 ROLLBACK("-rollback"), //從升級中回滾到前一個版本 FINALIZE("-finalize"), //提交一次升級 IMPORT ("-importCheckpoint");// 從名字節點的一個檢查點恢復 ... }
咱們發現上面提到的全部存儲空間狀態和當前施加在存儲空間的動做相關。其中只有部分是和升級相關的。
接下來咱們好好研究下這些狀態是怎麼獲得的?
要回答上面咱們提出的問題,就必須好看下狀態分析函數:analyzeStorage。這函數在 Storage.java 文件中。路徑爲:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\common。
public StorageState analyzeStorage(StartupOption startOpt) throws IOException { ... // check whether current directory is valid File versionFile = getVersionFile(); boolean hasCurrent = versionFile.exists(); // check which directories exist boolean hasPrevious = getPreviousDir().exists(); boolean hasPreviousTmp = getPreviousTmp().exists(); boolean hasRemovedTmp = getRemovedTmp().exists(); boolean hasFinalizedTmp = getFinalizedTmp().exists(); boolean hasCheckpointTmp = getLastCheckpointTmp().exists(); if (!(hasPreviousTmp || hasRemovedTmp || hasFinalizedTmp || hasCheckpointTmp)) { // no temp dirs - no recovery if (hasCurrent) return StorageState.NORMAL; if (hasPrevious) throw new InconsistentFSStateException(root, "version file in current directory is missing."); return StorageState.NOT_FORMATTED; } if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0) + (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1) // more than one temp dirs throw new InconsistentFSStateException(root, "too many temporary directories."); // # of temp dirs == 1 should either recover or complete a transition if (hasCheckpointTmp) { return hasCurrent ? StorageState.COMPLETE_CHECKPOINT : StorageState.RECOVER_CHECKPOINT; } if (hasFinalizedTmp) { if (hasPrevious) throw new InconsistentFSStateException(root, STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED + "cannot exist together."); return StorageState.COMPLETE_FINALIZE; } if (hasPreviousTmp) { if (hasPrevious) throw new InconsistentFSStateException(root, STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS + " cannot exist together."); if (hasCurrent) return StorageState.COMPLETE_UPGRADE; return StorageState.RECOVER_UPGRADE; } assert hasRemovedTmp : "hasRemovedTmp must be true"; if (!(hasCurrent ^ hasPrevious)) throw new InconsistentFSStateException(root, "one and only one directory " + STORAGE_DIR_CURRENT + " or " + STORAGE_DIR_PREVIOUS + " must be present when " + STORAGE_TMP_REMOVED + " exists."); if (hasCurrent) return StorageState.COMPLETE_ROLLBACK; return StorageState.RECOVER_ROLLBACK; }
經過上面的代碼咱們就能夠清晰的知道每一個存儲空間狀態是如何獲得的。這裏咱們重點解釋下 COMPLETE_UPGRADE 和 RECOVER_UPGRADE 這兩個存儲空間狀態。
其餘的狀態就不作一一分析,你們根據代碼應該很容易就能夠得出各個狀態成立的條件。
獲得了分析後的存儲空間狀態,咱們就能夠根據不一樣的狀態將存儲系統恢復正常。咱們來看下這部分代碼,這部分代碼 仍然在 Storage.java 文件中。
public void doRecover(StorageState curState) throws IOException { File curDir = getCurrentDir(); String rootPath = root.getCanonicalPath(); switch(curState) { case COMPLETE_UPGRADE: // mv previous.tmp -> previous LOG.info("Completing previous upgrade for storage directory " + rootPath + "."); rename(getPreviousTmp(), getPreviousDir()); return; case RECOVER_UPGRADE: // mv previous.tmp -> current LOG.info("Recovering storage directory " + rootPath + " from previous upgrade."); if (curDir.exists()) deleteDir(curDir); rename(getPreviousTmp(), curDir); return; case COMPLETE_ROLLBACK: // rm removed.tmp LOG.info("Completing previous rollback for storage directory " + rootPath + "."); deleteDir(getRemovedTmp()); return; case RECOVER_ROLLBACK: // mv removed.tmp -> current LOG.info("Recovering storage directory " + rootPath + " from previous rollback."); rename(getRemovedTmp(), curDir); return; case COMPLETE_FINALIZE: // rm finalized.tmp LOG.info("Completing previous finalize for storage directory " + rootPath + "."); deleteDir(getFinalizedTmp()); return; case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint LOG.info("Completing previous checkpoint for storage directory " + rootPath + "."); File prevCkptDir = getPreviousCheckpoint(); if (prevCkptDir.exists()) deleteDir(prevCkptDir); rename(getLastCheckpointTmp(), prevCkptDir); return; case RECOVER_CHECKPOINT: // mv lastcheckpoint.tmp -> current LOG.info("Recovering storage directory " + rootPath + " from failed checkpoint."); if (curDir.exists()) deleteDir(curDir); rename(getLastCheckpointTmp(), curDir); return; default: throw new IOException("Unexpected FS state: " + curState); } }
這部分代碼很是容易理解,就是根據不一樣的狀態執行不一樣的操做。咱們仍然只分析兩個關鍵的狀態 COMPLETE_UPGRADE 和 RECOVER_UPGRADE 對應的操做。
這兩個狀態對應的操做也很好理解,分別是完成未完成的升級操做和回退未完成的升級操做。
以上升級機制的關鍵就在於在作各類升級操做的時候很好的利用臨時文件夾,方便後續分析當前存儲空間狀態和對異常狀況進行恢復。
同時也很好的利用了存儲空間的狀態機機制,這樣很好的下降了各類狀態之間的耦合性。
關鍵點: