在Spark的RDD中引入過lineage這一律念。指的是RDD之間的依賴。而Alluxio則使用lineage來表示文件之間的依賴。在代碼層面,指的是fileID之間的依賴。數據結構
代碼中的註釋指出:ide
* A lineage tracks the dependencies imposed by a job, including the input files the job depends on,
* and the output files the job generates.
內部數據結構:
@NotThreadSafe public final class Lineage implements JournalEntryRepresentable { private final long mId; private final List<Long> mInputFiles; private final List<Long> mOutputFiles; private final Job mJob; private final long mCreationTimeMs; }
有了lineage以後,如何使用lineage來實現文件的容錯呢?this
在源碼中,有2個關鍵類解決了這個問題,分別是LineageMaster和RecomputeExecutor。spa
有幾個問題:線程
1. 何時啓動LineageMaster?日誌
按道理,在Alluxio集羣啓動時,LineageMaster進程就應該已經「啓動」了。 code
查找LineageMaster的調用狀況,能夠發如今alluxio.master.AlluxioMaster對象中,main方法裏會調用master.start();對象
順着這個思路,找到startMasters方法,就能夠發現以下代碼:blog
if (LineageUtils.isLineageEnabled(MasterContext.getConf())) { mLineageMaster.start(isLeader); }
也就是說,在lineage使能的狀況下,纔會啓動mLineageMaster.接口
LineageMaster的依賴:
LineageMaster的啓動過程是這樣的:
@Override public void start(boolean isLeader) throws IOException { super.start(isLeader); if (isLeader) { mCheckpointExecutionService = getExecutorService() .submit(new HeartbeatThread(HeartbeatContext.MASTER_CHECKPOINT_SCHEDULING, new CheckpointSchedulingExcecutor(this, mFileSystemMaster), mConfiguration.getInt(Constants.MASTER_LINEAGE_CHECKPOINT_INTERVAL_MS))); mRecomputeExecutionService = getExecutorService() .submit(new HeartbeatThread(HeartbeatContext.MASTER_FILE_RECOMPUTATION, new RecomputeExecutor(new RecomputePlanner(mLineageStore, mFileSystemMaster), mFileSystemMaster), mConfiguration.getInt(Constants.MASTER_LINEAGE_RECOMPUTE_INTERVAL_MS))); } }
先分析LineageMaster.start()除了super.start(isLeader)的代碼。
根據這部分代碼,能夠分析出,LineageMaster只會在leader上啓動,而不會在standby master上啓動。
啓動包含2個Executor服務的運行,運行在這2個Executor服務上的是2個心跳線程(HeartBeatThread)。
這2個心跳線程所作的事情分別是:
checkpoint執行服務所作的是Master Checkpoint Scheduling。executor是CheckpointSchedulingExcecutor。對應的Timer的interval是由「alluxio.master.lineage.checkpoint.interval.ms」所設定。
而RecomputeExecutionService所作的事情則是文件的重計算(MASTER_FILE_RECOMPUTATION).
在啓動這2個服務以前,還會調用父類的start方法:
先了解下AbstractMaster類的功能:
* This is the base class for all masters, and contains common functionality. Common functionality
* mostly consists of journal operations, like initialization, journal tailing when in standby mode,
* or journal writing when the master is the leader.
分析start方法的代碼,首先會起一個固定線程數的線程池。而後,若是是leader,那麼就會作以下事情:
1.判斷對應的日誌是否是讀寫日誌;
2.將全部的日誌標記爲「Complete」;
3.執行全部的日誌文件中的Entry;
4.初始化日誌,把全部完成日誌的狀態寫入檢查點文件。
AbstractMaster在此處會調用子類的streamToJournalCheckpoint方法。
LineageMaster實現了JournalCheckpointStreamable接口,對應的streamToJournalCheckpoint方法調用LineStore的streamToJournalCheckpoint方法,將依賴轉換成日誌條目寫入JournalOutputStream.
2.RecomputeExecutor已經在LineageMaster中例化過了,那麼它的做用是什麼?
RecomputeExecutor包含一個內部類:RecomputeLauncher。
這個內部類實現了runnable接口,這意味着這個類的實例將會被一個線程執行。
該類的run方法作的事情是根據RecomputePlan類對象的依賴,循環地執行對應的任務:
a 初始化文件
b getJob().run()
總結,能夠看出,lineage的實現與一般的預期較爲相符。Master提供了穩定的重算服務,重算服務則由包含有RecomputeExecutor功能的HeartbeatThread實現。
Master通用的功能在AbstractMaster中提出甚至實現。從當前的線索出發,alluxio更多的方面能夠挖掘出來。