MapReduce是咱們再進行離線大數據處理的時候常常要使用的計算模型,MapReduce的計算過程被封裝的很好,咱們只用使用Map和Reduce函數,因此對其總體的計算過程不是太清楚,同時MapReduce1.0和MapReduce2.0在網上有不少人混淆。java
Input可是輸入文件的存儲位置,python
,它也能夠是本機上的文件位置。
咱們來仔細分析下input算法
首先咱們知道要和JobTracker打交道是離不開JobClient這個接口的,就如上圖所示,bash
而後JobClient中的Run方法 會讓 JobClient 把全部 Hadoop Job 的信息,好比 mapper reducer jar path, mapper / reducer class name, 輸入文件的路徑等等,告訴給 JobTracker,以下面的代碼所示:網絡
public int run(String[] args) throws Exception { //create job Job job = Job.getInstance(getConf(), this.getClass().getSimpleName()); // set run jar class job.setJarByClass(this.getClass()); // set input . output FileInputFormat.addInputPath(job, new Path(PropReader.Reader("arg1"))); FileOutputFormat.setOutputPath(job, new Path(PropReader.Reader("arg2"))); // set map job.setMapperClass(HFile2TabMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // set reduce job.setReducerClass(PutSortReducer.class); return 0; }
除此之外,JobClient.runJob() 還會作一件事,使用 InputFormat類去計算如何把 input 文件 分割成一份一份,而後交給 mapper 處理。inputformat.getSplit() 函數返回一個 InputSplit 的 List, 每個 InputSplit 就是一個 mapper 須要處理的數據。app
一個 Hadoop Job的 input 既能夠是一個很大的 file, 也能夠是多個 file; 不管怎樣,getSplit() 都會計算如何分割 input.分佈式
inputFormat它其實是個 interface, 須要 類 來繼承,提供分割 input 的邏輯。函數
Jobclient 有一個方法叫 setInputFormat(), 經過它,咱們能夠告訴 JobTracker 想要使用的 InputFormat 類 是什麼。若是咱們不設置,Hadoop默認的是 TextInputFormat, 它默認爲文件在 HDFS上的每個 Block 生成一個對應的 InputSplit. 因此你們使用 Hadoop 時,也能夠編寫本身的 input format, 這樣能夠自由的選擇分割 input 的算法,甚至處理存儲在 HDFS 以外的數據。oop
對於每一個 map任務, 咱們知道它的 split 包含的數據所在的主機位置,咱們就把 mapper 安排在那個相應的主機上好了,至少是比較近的host. 你可能會問:split 裏存儲的 主機位置是 HDFS 存數據的主機,和 MapReduce 的主機 有什麼相關呢?爲了達到數據本地性,其實一般把MapReduce 和 HDFS 部署在同一組主機上。測試
既然一個 InputSplit 對應一個 map任務, 那麼當 map 任務收到它所處理數據的位置信息,它就能夠從 HDFS 讀取這些數據了。
map函數接受的是一個 key value 對。
RecordReader 能夠被定義在每一個 InputFormat 類中。當咱們經過 JobClient.setInputFormat() 告訴 Hadoop inputFormat 類名稱的時候, RecordReader 的定義也一併被傳遞過來。
因此整個Input,
1.JobClient輸入輸入文件的存儲位置
2.JobClient經過InputFormat接口能夠設置分割的邏輯,默認是按HDFS文件分割。
3.Hadoop把文件再次分割爲key-value對。
4.JobTracker負責分配對應的分割塊由對應的maper處理,同時 RecordReader負責讀取key-value對值。
JobClient運行後得到所需的配置文件和客戶端計算所得的輸入劃分信息。並將這些信息都存放在JobTracker專門爲該做業建立的文件夾中。文件夾名爲該做業的Job ID。JAR文件默認會有10個副本(mapred.submit.replication屬性控制);
而後輸入劃分信息告訴了JobTracker應該爲這個做業啓動多少個map任務等信息。
JobTracker經過TaskTracker 向其彙報的心跳狀況和slot(狀況),每個slot能夠接受一個map任務,這樣爲了每一臺機器map任務的平均分配,JobTracker會接受每個TaskTracker所監控的slot狀況。
JobTracker接收到做業後,將其放在一個做業隊列裏,等待做業調度器對其進行調度,看成業調度器根據本身的調度算法調度到該做業時,會根據輸入劃分信息爲每一個劃分建立一個map任務,並將map任務分配給TaskTracker執行,分配時根據slot的狀況做爲標準。
TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶着不少的信息,好比當前map任務完成的進度等信息。當JobTracker收到做業的最後一個任務完成信息時,便把該做業設置成「成功」。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。
Shuffle是咱們不須要編寫的模塊,但倒是十分關鍵的模塊。
在map中,每一個 map 函數會輸出一組 key/value對, Shuffle 階段須要從全部 map主機上把相同的 key 的 key value對組合在一塊兒,(也就是這裏省去的Combiner階段)組合後傳給 reduce主機, 做爲輸入進入 reduce函數裏。
Partitioner組件 負責計算哪些 key 應當被放到同一個 reduce 裏
HashPartitioner類,它會把 key 放進一個 hash函數裏,而後獲得結果。若是兩個 key 的哈希值 同樣,他們的 key/value對 就被放到同一個 reduce 函數裏。咱們也把分配到同一個 reduce函數裏的 key /value對 叫作一個reduce partition.
咱們看到 hash 函數最終產生多少不一樣的結果, 這個 Hadoop job 就會有多少個 reduce partition/reduce 函數,這些 reduce函數最終被JobTracker 分配到負責 reduce 的主機上,進行處理。
咱們知道map階段可能會產生多個spill file 當 Map 結束時,這些 spill file 會被 merge 起來,不是 merge 成一個 file,而是也會按 reduce partition 分紅多個。
當 Map tasks 成功結束時,他們會通知負責的 tasktracker, 而後消息經過 jobtracker 的 heartbeat 傳給 jobtracker. 這樣,對於每個 job, jobtracker 知道 map output 和 map tasks 的關聯。Reducer 內部有一個 thread 負責按期向 jobtracker 詢問 map output 的位置,直到 reducer 獲得全部它須要處理的 map output 的位置。
Reducer 的另外一個 thread 會把拷貝過來的 map output file merge 成更大的 file. 若是 map task 被 configure 成須要對 map output 進行壓縮,那 reduce 還要對 map 結果進行解壓縮。當一個 reduce task 全部的 map output 都被拷貝到一個它的 host上時,reduce 就要開始對他們排序了。
排序並非一次把全部 file 都排序,而是分幾輪。每輪事後產生一個結果,而後再對結果排序。最後一輪就不用產生排序結果了,而是直接向 reduce 提供輸入。這時,用戶提供的 reduce函數 就能夠被調用了。輸入就是 map 任務 產生的 key value對.
reduce() 函數以 key 及對應的 value 列表做爲輸入,按照用戶本身的程序邏輯,經合併 key 相同的 value 值後,產 生另一系列 key/value 對做爲最終輸出寫入 HDFS