JobTracker是整個MapReduce計算框架中的主服務,至關於集羣的「管理者」,負責整個集羣的做業控制和資源管理。本文對JobTracker的啓動過程及心跳接收與應答兩個主要功能進行分析。 node
函數offerService()會啓動 JobTracker內部幾個比較重要的後臺服務進程,分別是expireTrackersThread、retireJobsThread、 expireLaunchingTaskThread和completedJobsStoreThread。相關代碼以下: 數據結構
public class JobTracker { ... ... ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks, "expireLaunchingTasks"); ... ... public void offerService() throws InterruptedException, IOException { ... ... // expireTrackersThread後臺服務進程。 this.expireTrackersThread = new Thread(this.expireTrackers, "expireTrackers"); this.expireTrackersThread.start(); // retireJobsThread後臺服務進程。 this.retireJobsThread = new Thread(this.retireJobs, "retireJobs"); this.retireJobsThread.start(); //expireLaunchingTaskThread後臺服務進程。 expireLaunchingTaskThread.start(); // completedJobsStoreThread後臺服務進程。 if (completedJobStatusStore.isActive()) { completedJobsStoreThread = new Thread(completedJobStatusStore, "completedjobsStore-housekeeper"); completedJobsStoreThread.start(); } ... ... } }
1) expireTrackersThread線程下面分別介紹這幾個服務線程。 併發
該線程主要用於發現和清理死掉的 TaskTracker。每一個TaskTracker會週期性地經過心跳向JobTracker彙報信息,而JobTracker會記錄每一個 TaskTracker最近的彙報心跳時間。若是某個TaskTracker在10分鐘內未彙報心跳,則JobTracker認爲它已死掉,並將經的相關 信息從數據結構trackToJobsToCleanup、trackerToTasksToCleanup、 trackerToMarkedTasksMap中清除,同時將正在運行的任務狀態標註爲KILLED_UNCLEAN。 app
2) retireJobsThread線程 框架
該線程主要用於清理長時間駐留在內存中的已經運行完成的做 業信息。JobTracker會將已經運行完成的做業信息存放到內存中,以便外部查詢,但隨着完成的做業愈來愈多,勢必會佔用JobTracker的大量 內存,爲此,JobTracker經過該線程清理駐留在內存中較長時間的已經運行完成的做業信息。 函數
當一個做業知足以下條件一、2或者條件一、3時,將被從數據結構jobs轉移到過時做業隊列中。 高併發
條件1 做業已經運行完成,即運行狀態爲SUCCESSED、FAILED或KILLED。 oop
條件2 做業完成時間距如今已經超過24小時(可經過參數mapred.jobtracker.retirejob.interval配置)。 this
條件3 做業擁有者已經完成做業總數超過100(可經過參數mapred.jobtracker.completeuserjobs.maximum配置)個。 spa
過時做業被統一保存到過時隊列中。當過時做業超過1000個(可經過參數mapred.job.tracker.retiredjobs.cache.size配置)時,將會從內存中完全刪除。
3) expireLaunchingTaskThread線程
該線程用於發現已經被分配給某個TaskTracker但一直未彙報信息的任務。當JobTracker將某個任務分配給TaskTracker後,若是該任務在10分鐘內未彙報進度,則JobTracker認爲該任務分配失敗,並將其狀態標註爲FAILED。
4) completedJobsStoreThread線程
該線程將已經運行完成的做業運行信息保存到HDFS上,並提供了一套存取這些信息的API。該線程可以解決如下兩個問題。
n 用戶沒法獲取好久以前的做業運行信息:前面提到線程retireJobsThread會清除長時間駐留在內存中的完成做業,這會致使用戶沒法查詢好久以前某個做業的運行信息。
n JobTracker重啓後做業運行信息丟失:當JobTracker因故障重啓後,全部本來保存到內存中的做業信息將會所有丟失。
該線程經過保存做業運行日誌的方式,使得用戶能夠查詢任意時間提交的做業和還原做業的運行信息。
默認狀況下,該線程不會啓用,能夠經過下表所示的幾個參數配置並啓用該線程。
配置參數 |
參數含義 |
mapred.job.tracker.persist.jobstatus.active |
是否啓用該線程 |
mapred.job.tracker.persist.jobstatus.hours |
做業運行信息保存時間 |
mapred.job.tracker.persist.jobstatus.dir |
做業運行信息保存路徑 |
在MapReduce中,JobTracker存在單點故障問題。若是它因異常退出後重啓,那麼全部正在運行的做業運行時信息將丟失。若是不採用適當的做業恢復機制對做業信息進行恢復,則全部做業需從新提交,且已經計算完成的任務需從新計算。這勢必形成資源浪費。
爲了解決JobTracker面臨的單點故障問 題,Hadoop設計了做業恢復機制,過程以下:做業從提交到運行結束的整個過程當中,JobTracker會爲一些關鍵事件記錄日誌(由 JobHistory類完成)。對於做業而言,關鍵事件包括做業提交、做業建立、做業開始運行、做業運行完成、做業運行失敗、做業被殺死等;對於任務而 言,關鍵事件包括任務建立、任務開始運行、任務運行結束、任務運行失敗、任務被殺死等。當JobTracker因故障重啓後(重啓過程當中,全部 TaskTracker仍然活着),若是管理員啓用了做業恢復功能(將參數mapred.jobtracker.restart.recover置爲 true),則JobTracker會檢查是否存在須要恢復運行狀態的做業,若是有,則經過日誌恢復這些做業的運行狀態(由 RecoveryManager類完成),並從新調度那些未運行完成的任務(包括產生部分結果的任務)。
心跳是溝通TaskTracker和JobTracker的橋樑,它其實是一個RPC函數。TaskTracker週期性地調用該函數彙報節點和任務狀態信息,從而造成心跳。在Hadoop中,心跳主要有三個做用:
n 判斷TaskTracker是否活着。
n 及時讓JobTracker獲取各個節點上的資源使用狀況和任務運行狀態。
n 爲TaskTracker分配任務。
TaskTracker週期性地調用RPC函數heartbeat向JobTracker彙報信息和領取任務。該函數定義以下:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted,boolean initialContact,boolean acceptNewTasks, short responseId)
該函數的各個參數含義以下。
status |
該參數封裝了TaskTracker上的各類狀態信息。包括: String trackerName;//TaskTracker名稱 String host;//TaskTracker主機名 int httpPort;//TaskTracker對外的HTTP端口號 int failures;//該TaskTracker上已經失敗的任務總數 List<TaskStatus> taskReports;//正在運行的各個任務運行狀態 volatile long lastSeen;//上次彙報心跳的時間 private int maxMapTasks;/*Map slot總數,即容許同時運行的Map Task總數,由參數mapred.tasktracker.map.tasks.maximum設定*/ private int maxReduceTasks;//Reduce slot總數 private TaskTrackerHealthStatus healthStatus;//TaskTracker健康狀態 private ResourceStatus resStatus;//TaskTracker資源(內存,CPU等)信息 |
restarted |
表示TaskTracker是否剛剛從新啓動。 |
initialContact |
表示TaskTracker是否初次鏈接JobTracker |
acceptNewTasks |
表示TaskTracker是否能夠接收新任務,這一般取決於slot是否有剩餘和節點健康狀態等。 |
responseId |
表示心跳響應編號,用於防止重複發送心跳。每接收一次心跳後,該值加1。 |
該函數的返回值爲一個HeartbeatResponse對象,該對象主要封裝了JobTracker向TaskTracker下達的命令,具體以下:
class HeartbeatResponse implements Writable, Configurable { ... ... short responseId; // 心跳響應編號 int heartbeatInterval; // 下次心跳的發送間隔 TaskTrackerAction[] actions; // 來自JobTracker的命令,可能包括殺死做業等 Set<JobID> recoveredJobs = new HashSet<JobID>(); // 恢復完成的做業列表。 ... ... }
該函數的內部實現邏輯主要分爲兩個步驟:更新狀態和下達命令。JobTracker首先將TaskTracker彙報的最新任務運行狀態保存到相應數據結構中,而後根據這些狀態信息和外界需求爲其下達相應的命令。
函數heartbeat首先會更新TaskTracker/Job/Task的狀態信息。相關代碼以下:
接下來,跟蹤進入函數processHeartbeat內部。該函數首先進行一系列異常狀況檢查,而後更新TaskTracker/Job/Task的狀態信息。相關代碼以下:
private synchronized boolean processHeartbeat( TaskTrackerStatus trackerStatus, boolean initialContact, long timeStamp) throws UnknownHostException { ... ... updateTaskStatuses(trackerStatus); // 更新Task狀態信息 updateNodeHealthStatus(trackerStatus, timeStamp); // 更新節點健康狀態 ... ... }
更新完狀態信息後,JobTracker要爲TaskTracker構造一個HeartbeatResponse對象做爲心跳應答。該對象主要有兩部份內容:下達給TaskTracker的命令和下次彙報心跳的時間間隔。下面分別對它們進行介紹:
1. 下達命令
JobTracker將下達給TaskTracker的命 令封裝成TaskTrackerAction類,主要包括ReinitTrackerAction(從新初始化)、LauchTaskAction(運行 新任務)、KillTaskAction(殺死任務)、KillJobAction(殺死做業)和CommitTaskAction(提交任務)五種。下 面依次對這幾個命令進行介紹。
1) ReinitTrackerAction
JobTracker收到TaskTracker發送過來 的心跳信息後,首先要進行一致性檢查,若是發現異常狀況,則會要求TaskTracker從新對本身進行初始化,以恢復到一致的狀態。當出現如下兩種不一 致狀況時,JobTracker會向TaskTracker下達ReinitTrackerAction命令。
n 丟失上次心跳應答信息:JobTracker會保存向 每一個TaskTracker發送的最近心跳應答信息,若是JobTracker未剛剛重啓且一個TaskTracker並不是初次鏈接 JobTracker(initialContact!=true),而最近的心跳應答信息丟失了,則這是一種不一致狀態。
n 丟失TaskTracker狀態信 息:JobTracker接收到任何一個心跳信息後,會將TaskTracker狀態(封裝在類TaskTrackerStatus中)信息保存起來。如 果一個TaskTracker非初次鏈接JobTracker但狀態信息卻不存在,則也是一種不一致狀態。
相關代碼以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... return new HeartbeatResponse(responseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); ... ... } }
2) LauchTaskAction
該類封裝了TaskTracker新分配的任務。 TaskTracker接收到該命令後會啓動一個子進程運行該任務。Hadoop將一個做業分解後的任務分紅兩大類:計算型任務和輔助型任務。其中,計算 型任務是處理實際數據的任務,包括Map Task和Reduce Task兩種(對應TaskType類中的MAP和REDUCE兩種類型),由專門的 任務調度器對它們進行調度;而輔助型任務則不會處理實際的數據,一般用於同步計算型任務或者清理磁盤上無用的目錄,包括job-setup task、 job-cleanup task和task-cleanup task三種(對應TaskType類中的JOB_SETUP,JOB_CLEANUP和 TASK_CLEANUP三種類型),其中,job-setup task和job-cleanup task分別用做計算型任務開始運行同步標識和結束 運行同步標識,而task-cleanup task則用於清理失敗的計算型任務已經寫到磁盤上的部分結果,這種任務由JobTracker負責調度,且 運行優先級高於計算型任務。
若是一個正常(不在黑名單中)的TaskTracker尚 有空閒slot(acceptNewTasks爲true),則JobTracker會爲該TaskTracker分配新任務,任務選擇順序是:先輔助型 任務,再計算型任務。而對於輔助型任務,選擇順序依次爲job-cleanup task、task-cleanup task和job- setup task,具體代碼以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); // 若是沒有輔助型任務,則選擇計算型任務 if (tasks == null ) { // 由任務調度器選擇一個或多個計算型任務 tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); } if (tasks != null) { for (Task task : tasks) { expireLaunchingTasks.addNewTask(task.getTaskID()); if(LOG.isDebugEnabled()) { LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); } // 將分配的任務封裝成LauchTAskAction actions.add(new LaunchTaskAction(task)); } ... ... } ... ... }
3) KillTaskAction
該類封裝了TaskTracker需殺死的任務。TaskTracker收到該命令後會殺掉對應任務、清理工做目錄和釋放slot。致使JobTracker向TaskTracker發送該命令的緣由有不少,主要包括如下幾個場景:
n 用戶使用命令「bin/hadoop job -kill-task」或者「bin/hadoop job -fail-task」殺死一個任務或者使一個任務失敗。
n 啓用推測執行機制後,同一份數據可能同時由兩個Task Attempt處理。當其中一個Task Attempt執行成功後,另一個處理相同數據的Task Attempt將被殺掉。
n 某個做業運行失敗,它的全部任務將被殺掉。
n TaskTracker在必定時間內未彙報心跳,則JobTracker認爲其死掉,它上面的全部Task均被標註爲死亡。
相關代碼以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for tasks to be killed List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); if (killTasksList != null) { actions.addAll(killTasksList); } ... ... } }
4) KillJobAciton
該類封裝了TaskTracker待清理的做業。TaskTracker接收到該命令後,會清理做業的臨時目錄。致使JobTracker向TaskTracker發送該命令的緣由有不少,主要包括如下幾個場景:
n 用戶使用命令「」或者「」殺死一個做業或者是使一個做業失敗。
n 做業運行完成,通知TaskTracker清理該做業的工做目錄。
n 做業運行失敗,即同一個做業失敗的Task數目超過必定比例。
相關代碼以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for jobs to be killed/cleanedup List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName); if (killJobsList != null) { actions.addAll(killJobsList); } ... ... } }
5) CommitTaskAction
該類封裝了TaskTracker需提交的任務。爲了防止 同一個TaskInProgress的兩個同時運行的Task Attempt(好比打開推測執行功能,一個任務可能存在備份任務)同時打開一個文件或者 往一個文件中寫數據而產生衝突,Hadoop讓每一個Task Attempt寫到單獨一個文件(以TaskAttemptID命名,好比 attempt_201412031706_0008_r_000000_0)中。一般而言,Hadoop讓每一個Task Attempt成功運行完成 後,再將運算結果轉移到最終目錄${mapred.output.dir}中。Hadoop將一個成功運行完成的Task Attempt結果文件從臨時 目錄「提高」至最終目錄的過程,稱爲「任務提交」。當TaskInProgress中一個任務被提交後,其餘任務將被殺死,同時意味着該 TaskInProgress運行完成。相關代碼以下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for tasks whose outputs can be saved List<TaskTrackerAction> commitTasksList = getTasksToSave(status); if (commitTasksList != null) { actions.addAll(commitTasksList); } ... ... } }
TaskTracker心跳時間間隔大小應該適度,若是過小,則JobTracker須要處理高併發的心跳鏈接請求,必然產生不小的併發壓力;若是太大,空閒的資源不能及時彙報給JobTracker(進而爲之分配新的Task),形成資源空閒,進而下降系統吞吐率。2. 調整心跳間隔
TaskTracker彙報心跳的時間間隔並非一成不變 的,它會隨着集羣規模的動態調整(好比節點死掉或者用戶動態添加新節點)而變化,以便可以合理利用JobTracker的併發處理能力。在 Hadoop MapReduce中,只有JobTracker知道某一時刻集羣的規模,所以由JobTracker爲每一個TaskTracker計算下 一次彙報心跳的時間間隔,並經過心跳機制告訴TaskTracker。
JobTracker容許用戶經過參數配置心跳的時間間隔 加速比,即每增長mapred.heartbeats.in.second(默認是100,最小是1)個節點,心跳時間間隔增長 mapreduce.jobtracker.heartbeats.scaling.factor(默認是1,最小是0.01)秒。同時,爲了防止用戶參 數設置不合理而對JobTracker產生較大負載,JobTracker要示心跳時間間隔至少爲3秒。具體計算方法以下:
public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol, RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JobTrackerMXBean { ... ... /** * Calculates next heartbeat interval using cluster size. * Heartbeat interval is incremented by 1 second for every 100 nodes by default. * @return next heartbeat interval. */ public int getNextHeartbeatInterval() { // get the no of task trackers int clusterSize = getClusterStatus().getTaskTrackers(); int heartbeatInterval = Math.max( (int)(1000 * HEARTBEATS_SCALING_FACTOR * Math.ceil((double)clusterSize /NUM_HEARTBEATS_IN_SECOND)), HEARTBEAT_INTERVAL_MIN) ; return heartbeatInterval; } public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // calculate next heartbeat interval and put in heartbeat response int nextInterval = getNextHeartbeatInterval(); response.setHeartbeatInterval(nextInterval); ... ... } ... ... }