MapReduce做業提交源碼分析

MapReduce做業提交源碼分析 app

咱們在編寫MapReduce程序的時候,首先須要編寫Map函數和Reduce函數。完成mapper和reducer的編寫後,進行Job的配置;Job配置完成後,調用Job.submit()方法完成做業的提交。那咱們思考一下,Job最終如何完成做業(job)的提交呢?粗略想一下,Job必然須要經過某種方式鏈接到JobTracker,由於只有這樣才能將job提交到JobTracker上進行調度執行。還須要考慮一下,咱們本身編寫的mapper和reducer,即Jar文件如何傳送到JobTracker上呢?其中有一種最簡單也比較直觀的方法,直接經過socket傳輸給JobTracker,由JobTracker再傳輸給TaskTracker(注意:MapReduce並無採用這種方法)。第三個須要考慮的內容是,JobTracker如何將用戶做業的配置轉化成map task和reduce task。下面咱們來分析一下MapReduce這些功能的實現。 socket

首先在class Job內部經過JobClient完成做業的提交,最終由JobClient完成與JobTracker的交互功能。在JobClient的構造函數中,經過調用RPC完成與JobTracker鏈接的創建。 函數

完成創建後,JobClient首先肯定job相關文件的存放位置(咱們上面提到mapreduce沒有采用將jar即其餘文件傳輸給JobTracker的方式,而是將這些文件保存到HDFS當中,而且能夠根據用戶的配置存放多份)。至於該存放目錄的分配是經過調用RPC訪問JobTracker的方法來進行分配的,下面看一下JobTracker的分配代碼: oop

final Path stagingRootDir = new Path(conf.get( 源碼分析

"mapreduce.jobtracker.staging.root.dir", spa

"/tmp/hadoop/mapred/staging")); hadoop

final FileSystem fs = stagingRootDir.getFileSystem(conf); get

return fs.makeQualified(new Path(stagingRootDir, user + "/.staging")).toString(); 源碼

注意上面代碼所生成的stagingRootDir是全部job文件的存放目錄,是一個根目錄,並不單指當前job。 it

完成job存放目錄的分配後,JobClient向JobTracker申請一個JobID(經過RPC,注意基本上JobClient與JobTracker的全部通訊都是經過RPC完成的,若是下文沒有顯示著名也應該屬於這種狀況)。

JobID jobId = jobSubmitClient.getNewJobId();

下面是JobTracker.getNewJobId的具體實現:

public synchronized JobID getNewJobId() throws IOException {

return new JobID(getTrackerIdentifier(), nextJobId++);

             }

得到JobID後,將該JobID與上面的stagingRootDir組合就構成了Job文件的具體存放地址的構建。進行這些相關工做後,JobClient將相關的文件存儲到HDFS當中。

相關文章
相關標籤/搜索