MapReduce工做的基本流程

     Hadoop0.2以前版本和以後版本在Job中有很大的改進,本次採用的版本是Hadoop1.1.2版本。java

     如今做爲做業驅動器,能夠直接繼承Configured以及實現Tool,這種方式能夠很便捷的獲取啓動時候命令行中輸入的做業配置參數,常規的Job啓動以下:緩存


public class SortByHash extends Configured implements Tool
{
   public int run(String[] args) throws Exception {
    //這裏面負責配置job屬性
        Configuration conf=getConf();
        String[] paths=new GenericOptionsParser(conf, args).getRemainingArgs();
        String tradeDir=paths[0];
        String payDir=paths[1];
    String joinDir=paths[2];
    Job job=new Job(conf,"JoinJob");
    job.setJarByClass(JoinMain.class);
    FileInputFormat.addInputPath(job, new Path(tradeDir));
    FileInputFormat.addInputPath(job, new Path(payDir));
    FileOutputFormat.setOutputPath(job, new Path(joinDir));
                                                                                                                                                          
    job.setMapperClass(JoinMapper.class);
    job.setReducerClass(JoinReducer.class);
    job.setMapOutputKeyClass(TextIntPair.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.waitForCompletion(true);
    int exitCode=job.isSuccessful()?0:1;
    return exitCode;
   }
   public static void main(String[] args) throws Exception
   {
      // TODO Auto-generated method stub
      int exitCode=ToolRunner.run(new SortByHashPartitioner(), args);
      System.exit(exitCode);
   }
}


因爲Tool的全部實現都須要實現Configurable,Configured又是Configurable的具體實現,因此要同時繼承Configured和實現Tool,這樣就不須要實現Tool中定義的全部方法了。利用Tool接口來跑MapReduce,能夠在命令行中設置一些參數,比硬編碼好不少。網絡

      注意利用Tool啓動做業基本方式以下:ToolRunner首先調用本身的靜態方法run,在該方法中會首先建立一個Configurable對象。而後調用GenericOptionsParser解析命令行參數,並設置給剛建立的Configurable對象。而後再次設置主類(這裏即SortByHashPartitioner)setConf方法,最後調用主類的run方法執行。因此在run中要想使用命令行參數必須以下:app

       Configurationconf = getConf();分佈式

       Jobjob = newJob(conf);ide

       hadoop0.2以後的做業啓動是調用job.waitForCompletion(true);的;而後就會進行做業的提交、執行、完成等操做oop

調用waitForCompletion具體的工做流:編碼

第一,調用submit提交做業spa

第二,當參數爲true的時候,調用monitorAndPrintJob來進行監聽做業的進度。命令行

做業提交即submit():

第一,打開一個JobTracker的鏈接,這裏會建立一個JobClient對象

jobClient= new JobClient((JobConf) getConfiguration());

這裏的Configuration在初始化建立job的時候就會主動建立的

第二,根據建立的JobClient來調用submitJobInternal()提交做業給系統。

這裏會對於命令行中的選項進行檢查

1)獲取一個做業編號JobIDjobSubmitClient.getNewJobId()jobSubmitclient是一個JobSubmissionProtocolJobTracker就是這個類的子類,JobClient建立的時候就會new一個

2)獲取目錄的代理,將運行做業須要的資源jar文件,配置文件都複製到一個以做業ID命名的目錄下JobTracker文件系統中。

3)檢查做業的輸出說明。若是沒有指定的輸出目錄或者輸出目錄已經不存在,則不提交,返回錯誤

4)建立做業的輸入分片。若是分片沒法計算,如輸入路徑不存在,則不提交,報告錯誤。

5)將該做業寫入做業隊列中,而後將該文件寫入JobTracker的文件系統中

6)全部都經過後,真正的提交做業,調用submitJob()告知JobTracker準備執行做業.

做業初始化

JobTracker接受到submitJob()調用後,會將此調用放入內部隊列中queue,交由做業調度器(JobScheduler)進行調度,並對其進行初始化操做。

初始化主要是由做業調度器完成的,建立一個任務運行列表。做業調度器會首先從共享文件系統中獲取JobClient已經計算好的分片信息,而後爲每個分片建立一個Map任務,建立的reduce數量由mapred.reduce.task來決定,通常是經過setNumReduceTask()設定的。

任務的分配

tasktracker會按期的向jobtracker發送一個心跳告訴是否存活,也是兩個之間的通訊通道。這裏發送心跳的目的就是利用心跳來告知jobtrackertasktracker還活着,會指明本身是否已經準備好運行新的任務,若是是,則jobtracker會爲它分配一個任務。這裏的tasktracker就利用週期性的循環來向jobtracker來「拉活」。

每個tasktracker有固定的map任務槽和reduce任務槽。

選擇一個map任務,jobtracker會考慮tasktracker的網絡位置,而且選擇一個距離其輸入分片最近的tasktracker。通常都遵循數據本地化或機架本地化。

選擇一個reduce任務,jobtracker簡單地從待運行的reduce任務列表中選取下一個來執行,不準要考慮數據的本地化。

任務的執行

tasktracker初次被分配了一個任務後,就開始要運行該任務。

第一步,從共享文件系統將做業的JAR文件複製到tasktracker所在的文件系統,目的就是實現了做業的JAR文件本地化。而且將應用程序所須要的所有文件從分佈式緩存中複製到本地磁盤。

第二步,tasktracker爲分配的任務建立一個本地工做目錄,將JAR文件內容解壓到這個文件夾

第三步,tasktracker建立一個TaskRunner實例來運行該任務。TaskRunner啓動一個新的JVM來運行每一個任務。

任務進度的更新

monitorAndPrintJob(),這個方法就是實時的報告做業的運行狀況,以打印在控制檯上。這個會每隔1秒進行查看,利用的就是Thread.sleep(1000)來執行的。

若是任務報告了進度,則會設置一個標誌來代表任務狀態發生了變化。在tasktracker中,除了運行任務的線程外,還有個獨立的線程每隔3秒會檢測任務的狀態,若是已經設置,則告知tasktracker當前任務狀態。而tasktracker每隔5秒會發送心跳到jobtracker,這裏發送心跳的目的主要是報告tasktracker上運行的全部任務的狀態。

做業的完成

jobtracker收到做業的最後一個任務已經完成的通知後,則就會把做業的狀態設置爲「成功」。此時JobClient會打印一條消息告知用戶做業已經完成了。Jobtrackertasktracker都會清空做業的工做狀態。

相關文章
相關標籤/搜索