MapReduce做業提交源碼分析 app
咱們在編寫MapReduce程序的時候,首先須要編寫Map函數和Reduce函數。完成mapper和reducer的編寫後,進行Job的配置;Job配置完成後,調用Job.submit()方法完成做業的提交。那咱們思考一下,Job最終如何完成做業(job)的提交呢?粗略想一下,Job必然須要經過某種方式鏈接到JobTracker,由於只有這樣才能將job提交到JobTracker上進行調度執行。還須要考慮一下,咱們本身編寫的mapper和reducer,即Jar文件如何傳送到JobTracker上呢?其中有一種最簡單也比較直觀的方法,直接經過socket傳輸給JobTracker,由JobTracker再傳輸給TaskTracker(注意:MapReduce並無採用這種方法)。第三個須要考慮的內容是,JobTracker如何將用戶做業的配置轉化成map task和reduce task。下面咱們來分析一下MapReduce這些功能的實現。 socket
首先在class Job內部經過JobClient完成做業的提交,最終由JobClient完成與JobTracker的交互功能。在JobClient的構造函數中,經過調用RPC完成與JobTracker鏈接的創建。 函數
完成創建後,JobClient首先肯定job相關文件的存放位置(咱們上面提到mapreduce沒有采用將jar即其餘文件傳輸給JobTracker的方式,而是將這些文件保存到HDFS當中,而且能夠根據用戶的配置存放多份)。至於該存放目錄的分配是經過調用RPC訪問JobTracker的方法來進行分配的,下面看一下JobTracker的分配代碼: oop
final Path stagingRootDir = new Path(conf.get( 源碼分析
"mapreduce.jobtracker.staging.root.dir", spa
"/tmp/hadoop/mapred/staging")); hadoop
final FileSystem fs = stagingRootDir.getFileSystem(conf); get
return fs.makeQualified(new Path(stagingRootDir, user + "/.staging")).toString(); 源碼
注意上面代碼所生成的stagingRootDir是全部job文件的存放目錄,是一個根目錄,並不單指當前job。 it
完成job存放目錄的分配後,JobClient向JobTracker申請一個JobID(經過RPC,注意基本上JobClient與JobTracker的全部通訊都是經過RPC完成的,若是下文沒有顯示著名也應該屬於這種狀況)。
JobID jobId = jobSubmitClient.getNewJobId();
下面是JobTracker.getNewJobId的具體實現:
public synchronized JobID getNewJobId() throws IOException {
return new JobID(getTrackerIdentifier(), nextJobId++);
}
得到JobID後,將該JobID與上面的stagingRootDir組合就構成了Job文件的具體存放地址的構建。進行這些相關工做後,JobClient將相關的文件存儲到HDFS當中。