Hadoop0.2以前版本和以後版本在Job中有很大的改進,本次採用的版本是Hadoop1.1.2版本。java
如今做爲做業驅動器,能夠直接繼承Configured以及實現Tool,這種方式能夠很便捷的獲取啓動時候命令行中輸入的做業配置參數,常規的Job啓動以下:緩存
public class SortByHash extends Configured implements Tool { public int run(String[] args) throws Exception { //這裏面負責配置job屬性 Configuration conf=getConf(); String[] paths=new GenericOptionsParser(conf, args).getRemainingArgs(); String tradeDir=paths[0]; String payDir=paths[1]; String joinDir=paths[2]; Job job=new Job(conf,"JoinJob"); job.setJarByClass(JoinMain.class); FileInputFormat.addInputPath(job, new Path(tradeDir)); FileInputFormat.addInputPath(job, new Path(payDir)); FileOutputFormat.setOutputPath(job, new Path(joinDir)); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setMapOutputKeyClass(TextIntPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); int exitCode=job.isSuccessful()?0:1; return exitCode; } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int exitCode=ToolRunner.run(new SortByHashPartitioner(), args); System.exit(exitCode); } }
因爲Tool的全部實現都須要實現Configurable,而Configured又是Configurable的具體實現,因此要同時繼承Configured和實現Tool,這樣就不須要實現Tool中定義的全部方法了。利用Tool接口來跑MapReduce,能夠在命令行中設置一些參數,比硬編碼好不少。網絡
注意利用Tool啓動做業基本方式以下:ToolRunner首先調用本身的靜態方法run,在該方法中會首先建立一個Configurable對象。而後調用GenericOptionsParser解析命令行參數,並設置給剛建立的Configurable對象。而後再次設置主類(這裏即SortByHashPartitioner)的setConf方法,最後調用主類的run方法執行。因此在run中要想使用命令行參數必須以下:app
Configurationconf = getConf();分佈式
Jobjob = newJob(conf);ide
hadoop0.2以後的做業啓動是調用job.waitForCompletion(true);的;而後就會進行做業的提交、執行、完成等操做oop
調用waitForCompletion具體的工做流:編碼
第一,調用submit提交做業spa
第二,當參數爲true的時候,調用monitorAndPrintJob來進行監聽做業的進度。命令行
做業提交即submit():
第一,打開一個JobTracker的鏈接,這裏會建立一個JobClient對象
jobClient= new JobClient((JobConf) getConfiguration());
這裏的Configuration在初始化建立job的時候就會主動建立的
第二,根據建立的JobClient來調用submitJobInternal()提交做業給系統。
這裏會對於命令行中的選項進行檢查
1)獲取一個做業編號JobID,jobSubmitClient.getNewJobId(),jobSubmitclient是一個JobSubmissionProtocol,JobTracker就是這個類的子類,在JobClient建立的時候就會new一個
2)獲取目錄的代理,將運行做業須要的資源jar文件,配置文件都複製到一個以做業ID命名的目錄下JobTracker文件系統中。
3)檢查做業的輸出說明。若是沒有指定的輸出目錄或者輸出目錄已經不存在,則不提交,返回錯誤
4)建立做業的輸入分片。若是分片沒法計算,如輸入路徑不存在,則不提交,報告錯誤。
5)將該做業寫入做業隊列中,而後將該文件寫入JobTracker的文件系統中。
6)全部都經過後,真正的提交做業,調用submitJob()告知JobTracker準備執行做業.
做業初始化
當JobTracker接受到submitJob()調用後,會將此調用放入內部隊列中queue,交由做業調度器(JobScheduler)進行調度,並對其進行初始化操做。
初始化主要是由做業調度器完成的,建立一個任務運行列表。做業調度器會首先從共享文件系統中獲取JobClient已經計算好的分片信息,而後爲每個分片建立一個Map任務,建立的reduce數量由mapred.reduce.task來決定,通常是經過setNumReduceTask()設定的。
任務的分配
tasktracker會按期的向jobtracker發送一個心跳告訴是否存活,也是兩個之間的通訊通道。這裏發送心跳的目的就是利用心跳來告知jobtracker,tasktracker還活着,會指明本身是否已經準備好運行新的任務,若是是,則jobtracker會爲它分配一個任務。這裏的tasktracker就利用週期性的循環來向jobtracker來「拉活」。
每個tasktracker有固定的map任務槽和reduce任務槽。
選擇一個map任務,jobtracker會考慮tasktracker的網絡位置,而且選擇一個距離其輸入分片最近的tasktracker。通常都遵循數據本地化或機架本地化。
選擇一個reduce任務,jobtracker簡單地從待運行的reduce任務列表中選取下一個來執行,不準要考慮數據的本地化。
任務的執行
當tasktracker初次被分配了一個任務後,就開始要運行該任務。
第一步,從共享文件系統將做業的JAR文件複製到tasktracker所在的文件系統,目的就是實現了做業的JAR文件本地化。而且將應用程序所須要的所有文件從分佈式緩存中複製到本地磁盤。
第二步,tasktracker爲分配的任務建立一個本地工做目錄,將JAR文件內容解壓到這個文件夾
第三步,tasktracker建立一個TaskRunner實例來運行該任務。TaskRunner啓動一個新的JVM來運行每一個任務。
任務進度的更新
monitorAndPrintJob(),這個方法就是實時的報告做業的運行狀況,以打印在控制檯上。這個會每隔1秒進行查看,利用的就是Thread.sleep(1000)來執行的。
若是任務報告了進度,則會設置一個標誌來代表任務狀態發生了變化。在tasktracker中,除了運行任務的線程外,還有個獨立的線程每隔3秒會檢測任務的狀態,若是已經設置,則告知tasktracker當前任務狀態。而tasktracker每隔5秒會發送心跳到jobtracker,這裏發送心跳的目的主要是報告tasktracker上運行的全部任務的狀態。
做業的完成
當jobtracker收到做業的最後一個任務已經完成的通知後,則就會把做業的狀態設置爲「成功」。此時JobClient會打印一條消息告知用戶做業已經完成了。Jobtracker和tasktracker都會清空做業的工做狀態。