JobTracker是hadoop的重要的後臺守護進程之一,主要的功能是管理任務調度、管理TaskTracker、監控做業執行、運行做業容錯機制等。web
首先啓動interTrackerServer,將端口配置爲mapred.job.tracker綁定的地址和端口。interTrackerServer提供兩種用途:數組
接收和處理TaskTracker的heartbeat請求,必須實現InterTrackerProtocol接口及協議。緩存
接收和處理JobClient請求,submitJob、killJob等,必須實現JobSubmissionProtocol接口及協議。安全
其次啓動一個infoServer,運行StatusHttpServer,提供web服務。網絡
最後,啓動5個JobTracker子線程,各子線程功能以下:分佈式
ExpireLaunchingTasks:用於中止那些未在超時內報告進度的Task
ExpireTrackers:用於中止那些可能已經當掉的TaskTracker
RetireJobs:用於清除那些已經完成很長時間還存在隊列裏的做業
JobInitThread:用於初始化用戶做業
TaskCommitQueue:用於調度Task的那些全部與FileSystem操做相關的處理,並記錄Task的狀態信息。
JobTracker的啓動分析:函數
主要有兩個函數startTracker和offerService()函數。oop
一、JobTracker.startTracker()spa
這個函數在啓動時被調用主要有兩個步驟:線程
步驟一、構造JobTracker對象result,啓動兩個RPC服務,並等待JobTracker退出安全模式,若是構造出錯,睡眠1秒
步驟二、調用靜態方法JobEndNotifier.startNotifier()建立一個線程,從延遲隊列JobEndNotifier.queue中取出一個JobEndStatusInfo對象,而後經過sendNotification()調用httpNotification()構造一個HttpClient對象執行相應的http請求。
二、JobTracker.offerService()
負責建立啓動5個重要的線程及其回收線程資源。
TaskTracker的主要任務是執行JobTracker分發的任務。
TaskTracker啓動分析:
TaskTracker.TaskTracker()-->TaskTracker.run()
askTracker.TaskTracker()用於構造TaskTracker類的對象,在上述代碼中調用了TaskTracker(conf)含參構造函數,首先爲TaskTracker設置JobTracker地址,而後配置啓動一個StatusHttpServer對象,將服務綁定到做業配置項tasktracker.http.prot中指定的地址和端口,提供web服務,用於向用戶提供web界面查詢任務執行狀況的服務。最後調用initialize()完成TaskTracker初始化。
TaskTracker.run()是TaskTracker的主線程核心函數,調用TaskTracker.offerService()連接JobTracker並開始提供服務。若是出現網絡故障,需等待5s,而後重試,根據異常類型可能還需先調用TaskTracker.close完成清理,將全部任務置爲失敗,關閉RPC server,將TaskTracker.running置爲false,清理本地的Map計算輸出,關閉取Map計算結果的線程mapEventsFetcher等,清理完成後調用initialize()從新初始化。
TaskTracker核心子線程:
MapEventsFetcher線程、taskCleanup線程、TaskRunner線程。
在hadoop的master啓動的時候會開啓一個IPC Server以等待Slave的心跳數據包,Slave啓動時會主動連接master,並週期性的每一個3s向master發送一個心跳包。slave經過這個心跳包將本身的狀態告訴master,而後master在經過心跳包的返回值向slave節點傳送執行指令,整個過程就是hadoop的心跳檢測機制。
做業的建立主要由JobClient類負責完成。JobClient也是用戶做業和JobTracker交互的重要接口,能夠用來提交做業、跟蹤做業的狀態、訪問子任務的報告、日誌等,獲取mapreduce集羣狀態信息等。
JobClient在建立做業時執行的主要操做包括:檢查輸入/輸出的有效性,計算做業的Splits,複製做業的jar包和配置文件到HDFS的mapred系統目錄,最後提交做業給JobTracker並跟蹤做業執行狀態。整個做業的建立都在JobClient.runJob()函數中執行。
建立流程:
步驟一、首先根據傳入的JobConf參數構造JobClient對象,在JobClient的構造函數中會調用JobClient的init方法,經過其鏈接到JobTracker。
步驟二、調用JobClient.submitJob()向JobTracker提交做業,根據返回的RunningJob接口每隔1s檢查一次做業的狀態,若是執行完畢退出跟蹤,若是出錯,殺死該做業。
初始化:
步驟一、從配置文件讀取mapred.job.tracker,判斷是否爲本地執行的任務,初始化JobClient的JobSubmitClient。
步驟二、若是是本地任務,調用LocalJobRunner來初始化。
步驟三、若是非本地任務,使用RPC機制來構造一個JobSubmissionProtocol接口的代理,即調用JobTracker.getAddress(conf)得到JobTracker地址,再經過JobClient參數調用createProxy方法初始化JobSubmitClient。
做業提交:
提交是經過在JobClient.runJob()函數中調用JobClient.submitJob()函數完成的。最終經過調用JobClient.submitJobInternal()函數提交到做業JobTracker,則該函數返回NetworkedJob的RunningJob對象用於跟蹤做業。主要的執行流程以下:
步驟一、經過jobSubmitClient.getNewJobId()獲取做業名。
步驟二、得到做業提交目錄submitJobDit,並設置參數mapreduce.job.dir的值。
步驟三、獲取Job的分佈式緩存路徑。
步驟四、獲取做業配置文件目錄,獲取Reduce數目,以及本機Ip地址,並根據Reduce數目是否爲零來檢查輸入/輸出設置。
步驟五、爲用戶做業建立Split輸入分區。
步驟六、將JobConf的mapred.job.split.file項配置爲job.split在HDFS上的絕對路徑,根據splits[]數組大小設置Map任務數。
步驟七、獲得做業隊列名並設置ACL隊列管理信息。
步驟八、將JobConf的內容寫到HDFS的/${maprede.system.dir}/${jobid}/job.xml中。
步驟九、最後經過jobSubmitClient.submitJob()將名爲JobId的做業提交給JobTracker。返回status變量是JobStatus對象,用於跟蹤做業狀態。
在完成做業建立過程後再JobClient.submitJob()函數中經過JobSubmissionProtocol協議調用JobTracker.submitJob()函數提交做業到JobTracker,在submitJob()函數中主要負責用戶的用戶做業的初始化、構造JobInProgress對象,並初始化任務列表等,而任務的真正執行實際是由TaskTracker完成的。
JobTracker初始化
在submitJob()函數被調用以後,JobTracker就會接收到新的job請求,而後建立一個JobInProgress對象並經過它來來管理和調度任務。JobInProgress在建立的時候會初始化一系列與任務有關的參數。
一、JobTracker.submitJob()
submitJob()是JobTracker初始化的主函數。主要流程以下:
步驟一、獲得用戶組信息,返回做業狀態,新建jobInfo。
步驟二、建立JobInProgress對象,不鎖定JobTracker。
步驟三、覈對隊列是否處於running狀態,並檢查做業訪問權限。
步驟四、檢查做業是否由於無效的內存需求而不能運行。
步驟五、經過調用addJob(jobID,job)函數提交做業。該函數會返回JobStatus對象用於跟蹤做業的運行狀態。
二、JobInProgress.JobInProgress()
這個函數在JobTracker.submitJob()初始化做業中構造JobInProgress對象時調用執行。
步驟一、建立並初始化用於向JobClient彙報做業執行狀態的JobStatus對象。
步驟二、設置用戶名信息以及該JobInProgress對象的啓動時間。
步驟三、在joobTrackerbTracker本地文件系統的${mapred.local.dir}/目錄下建立${jobid}.jar,${jobid}.xml和${job.id}目錄,並在該目錄下建立job.xml文件。
步驟四、從做業配置JobConf中讀取做業優先級、隊列信息,Map任務數,Reduce任務數等。
步驟五、建立taskCompletionEvents列表。該列表用於JobTracker上跟蹤該做業完成事件,初始化大小爲Map任務數+Reduce任務數+10
步驟六、構建jobACLs,用於對用戶做業進行ACL權限控制。
步驟七、設置Map、Reduce,以及每一個Tracker上任務能夠容忍失敗的百分比。
步驟八、檢查每一個reduce的估計輸入大小是否小於reduce大小的限制值,最後註冊做業。
TaskTracker.startNewTask()
初始化完成以後,經過該函數啓動一個新的任務。在心跳機制檢測中,若是JobTracker返回了LaunchTaskAction指令,則TaskTracker在offerService中會調用TaskTracker.startNewTask()函數來處理新任務,即開始執行一個新的計算任務。
TaskTracker.localizeJob()
該函數在TaskTracker.startNewTask()中調用,主要負責計算任務初始化(本地化)並啓動計算任務。
TaskRunner.run()
最終執行TaskRunner.run函數,TaskTracker最終調用TaskTracker.TaskInProgress.launchTask()建立計算線程來執行。