MapReduce實現原理詳解

MR簡介

一個MR做業一般會把輸入的數據集切分爲若干獨立的數據塊,先由Map任務並行處理,而後MR框架對Map的輸出先進行排序,而後把結果做爲Reduce任務的輸入。MR框架是一種主從框架,由一個單獨的JobTracker節點和多個TaskTracker節點組成。(JobTracker至關於Master,負責做業任務的調度,TaskTracker至關於Slave,負責執行Master指派的任務)緩存

MR實現的操做流程

如上圖所示,具體MR的具體步驟可描述以下:併發

步驟1:app

把輸入文件分紅M塊(每一塊大小Hadoop默認是64M,可修改)。框架

步驟2:函數

master選擇空閒的執行者worker節點,把總共M個Map任務和R個Reduce任務分配給他們,如上圖中(2)所示。oop

步驟3:spa

一個分配了Map任務的worker讀取並處理輸入數據塊。從數據塊中解析出key/value鍵值對,把他們傳遞給用戶自定義的Map函數,由Map函數生成並輸出中間key/value鍵值對,暫時緩存在內存中,如上圖中(3)所示。命令行

步驟4:線程

緩存中的key/value鍵值對經過分區函數分紅R個區域,以後週期性地寫入本地磁盤上。並把本地磁盤上的存儲位置回傳給master,由master負責把這些存儲位置傳送給Reduce worker,如上圖中(4)所示。對象

步驟5:

當Reduce worker接收到master發來的存儲位置後,使用RPC協議從Map worker所在主機的磁盤上讀取數據。在獲取全部中間數據後,經過對key排序使得相同具備key的數據彙集在一塊兒。如上圖中(5)所示。

步驟6:

Reduce worker程序對排序後的中間數據進行遍歷,對每個惟一的中間key,Reduce worker程序都會將這個key對應的中間value值的集合傳遞給用戶自定義的Reduce函數,完成計算後輸出文件(每一個Reduce任務產生一個輸出文件)。如上圖中(6)所示。

做業提交流程

步驟1:命令行提交。用戶使用Hadoop命令行腳本提交MR程序到集羣。

步驟2:做業上傳。這一步驟包括了不少初始化工做,如獲取用戶做業的JobId,建立HDFS目錄,上傳做業、相關依賴庫等到HDFS上。

步驟3:產生切分文件。

步驟4:提交做業到JobTracker。

Mapper階段解讀

Mapper的輸入文件位於HDFS上,先對輸入數據切分,每個split分塊對應一個Mapper任務,經過RecordReader對象從輸入分塊中讀取並生成鍵值對,而後執行Map函數,輸出的中間鍵值對被partion()函數區分並寫入緩衝區,同時調用sort()進行排序。

Reducer階段解讀

Reducer主要有三個階段:Shuffle、Sort、Reduce

1. Shuffle階段:

Reducer的輸入就是Mapper階段已經排好序的輸出。在這個階段,框架爲每一個Reducer任務得到全部Mapper輸出中與之相關的分塊,把Map端的輸出結果傳送到Reduce端,大量操做是數據複製(所以也稱數據複製階段)。

2. Sort階段:

框架按照key對Reducer的輸入進行分組(Mapper階段時每個Map任務對於它自己的輸出結果會有一個排序分組,而不一樣Map任務的輸出中可能會有相同的key,所以要再一次分組)。Shuffle和Sort是同時進行的,Map的輸出也是一邊被取回一邊被合併。排序是基於內存和磁盤的混合模式進行,通過屢次Merge才能完成排序。(PS:若是兩次排序分組規則須要不一樣,能夠指定一個Comparator比較器來控制分組規則)。

3. Reduce階段:

經過Shuffle和Sort操做後獲得的<key, (list of values)>被送到Reducer的reduce()函數中執行,針對每個<key, (list of values)>會調用一次reduce()函數。

數據流向分析

步驟1:

輸入文件從HDFS到Mapper節點。通常狀況下,存儲數據的節點就是Mapper運行節點,避免數據在節點之間的傳輸(讓存儲靠近計算)。但總會有節點之間的數據傳輸存在,這時Hadoop會從離計算節點更近的存儲節點上傳輸數據。

步驟2:

Mapper輸出到內存緩衝區。Mapper的輸出不是直接寫到本地文件系統,而是先寫入內存緩衝區,緩衝區達到必定閾值後以臨時文件的形式寫入本地磁盤,當整個map任務結束後把全部臨時文件合併,產生最終的輸出文件。(Partioner就發生在該階段,寫入緩衝區的同時執行Partioner對文件進行分區)。

步驟3:

從內存緩衝區到本地磁盤。緩衝區大小默認是100M,須要在必定條件下將緩衝區中的數據臨時寫入磁盤,而後從新利用這塊緩衝區,這個過程稱做spill(溢寫),由單獨線程來完成,且不影響往緩衝區寫map結果的線程。溢寫線程啓動時不該該阻止map的結果輸出,因此整個緩衝區有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩衝區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啓動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還能夠往剩下的20MB內存中寫,互不影響。 當溢寫線程啓動後,須要對這80MB空間內的key作排序(Sort)。

步驟4:

從Mapper端的本地文件系統到Reduce端。也就是Shuffle階段,分三種狀況:

  1. Mapper輸出的臨時文件遠程複製到相應的Reduce節點,如上圖中4-1。
  2. Mapper節點的機器有Reduce槽位,Mapper輸出的臨時文件能夠直接寫 入本機Reduce的緩衝區,如上圖中4-2。
  3. 本機Reduce還會接收來自其餘Mapper節點的臨時文件,如上圖中4-3。

步驟5:

從Reduce端內存緩衝區流向Reduce端的本地磁盤。這個過程就是Merge和Sort階段。Merge包括內存文件的合併(5-1)以及磁盤文件合併(5-2),同時Sort以key爲鍵進行排序分組,產生輸出文件。

步驟6:

Merge和Sort以後直接流向Reduce函數進行歸約處理。

步驟7:

根據用戶指定的輸出類型寫入HDFS中,產生part-*形式的輸出文件。

整體處理流程分析

步驟1:

JobClient類中會把用戶應用程序的Mapper類、Reducer類以及配置文件JobConf打包成一個JAR文件保存到HDFS中(上圖b所示),JobClient在提交做業的同時把這個JAR文件的路徑一塊兒提交到JobTracker的master服務(做業調度器),如上圖1所示。

步驟2:

JobClient提交Job後,JobTracker會建立一個JobInProgress來跟蹤和調度這個Job做業,並將其添加到調度器的做業隊列中,如上圖2所示。

步驟3:

JobInProgress會根據提交的做業JAR文件中定義的輸入數據集建立相應數量的TaskInProgress用於監控和調度MapTask,同時建立指定數量的TaskInProgress用於監控和調度ReduceTask,默認爲1個ReduceTask,如上圖3所示。

步驟4:

JobTracker經過TaskInProgress來啓動做業任務(如上圖4),這時會把Task對象(MapTask和ReduceTask)序列化寫入相應的TaskTracker服務中(如上圖5)。

步驟5:

TaskTracker收到後建立對應的TaskInProgress(不是JobTracker中的TaskInProgress,但做用相似)來監控和調度運行該Task,如上圖中6所示。

步驟6:

啓動具體的Task進程,TaskTracker經過TaskInProgress管理的TaskRunner對象來運行Task,如圖中7所示。

步驟7:

TaskRunner自動裝載用戶做業JAR文件,啓動一個獨立Java子進程來執行Task,TaskRunner會先執行MapTask,如上圖中8所示。

步驟8:

MapTask先調用Mapper,生成中間鍵值對,若是用戶還定義了Combiner,再調用Combiner,把相同key的value作歸約處理,減小Map輸出的鍵值對集合。

步驟9:

MapTask的任務所有完成後,TaskRunner再調用ReduceTask進行來啓動Reducer(如上圖中11),注意MapTask和ReduceTask不必定運行在同一個TaskTracker節點中。

步驟10:

ReduceTask直接調用Reducer類處理Mapper的輸出結果,生成最終結果,寫入HDFS中。

Shuffle階段解讀

Mapper的輸出是寫入本地磁盤的,而且是按照partion進行分區的,Reduce的輸入是集羣上多個Mapper輸出數據中同一partion段的數據,Map任務可能會在不一樣時間完成,只要其中一個Map任務完成了,ReduceTask任務就開始複製它的輸出,這個階段稱爲Shuffle階段或者Copy階段。Reduce任務能夠由多個線程併發複製,默認是5個線程,能夠經過參數改變。

ReduceTask如何知道從哪些TaskTrackers中獲取Mapper 的輸出?經過心跳檢測機制,當Mapper完成任務後會通知TaskTracker,而後TaskTracker經過心跳機制將任務完成狀態和結果數據傳輸給JobTracker,JobTracker會保存一個Mapper輸出和TaskTrackers的映射關係表,Reducer中有一個線程會間歇地向JobTracker詢問Mapper輸出的地址,直到全部數據都取完。取完後TaskTrackers不會當即刪除這些數據,由於Reducer可能會失敗,在整個做業完成後JobTracker告知它們要刪除的時候纔去刪除。

相關文章
相關標籤/搜索