1、MapReduce模型框架css
MapReduce是一個用於大規模數據處理的分佈式計算模型,最初由Google工程師設計並實現的,Google已經將完整的MapReduce論文公開發布了。其中的定義是,MapReduce是一個編程模型,是一個用於處理和生成大規模數據集的相關的實現。用戶定義一個map函數來處理一個Key-Value對以生成一批中間的Key-Value對,再定義一個reduce函數將全部這些中間的有相同Key的Value合併起來。不少現實世界中的任務均可用這個模型來表達。html
一、MapReduce模型node
![](http://static.javashuo.com/static/loading.gif)
源數據 中間數據 結果數據算法
MapReduce模型如上圖所示,Hadoop MapReduce模型主要有Mapper和Reducer兩個抽象類。Mapper端主要負責對數據的分析處理,最終轉化爲Key-Value的數據結構;Reducer端主要是獲取Mapper出來的結果,對結果進行統計。編程
二、MapReduce框架windows
![](http://static.javashuo.com/static/loading.gif)
整個過程如上圖所示,包含4個獨立的實體,以下所示:數組
- client:提交MapReduce做業,好比,寫的MR程序,還有CLI執行的命令等。
- jobtracker:協調做業的運行,就是一個管理者。
- tasktracker:運行做業劃分後的任務,就是一個執行者。
- hdfs:用來在集羣間共享存儲的一種抽象的文件系統。
說明:
其實,還有namenode就是一個元數據倉庫,就像windows中的註冊表同樣。secondarynamenode能夠當作namenode的備份。datanode能夠當作是用來存儲做業劃分後的任務。在DRCP中,master是namenode,secondarynamenode,jobtracker,其它的3臺slaver都是tasktracker,datanode,且tasktracker都須要運行在HDFS的datanode上面。
MapReduce框架中組成部分及它們之間的關係,以下所示:
運行在Hadoop上的MapReduce應用程序最基本的組成部分包括:一是Mapper抽象類,一是Reducer抽象類,一是建立JobConf的執行程序。
JobTracker是一個master服務,軟件啓動以後JobTracker接收Job,負責調度Job的每個子任務Task運行於TaskTracker上,而且監控它們的運行,若是發現有失敗的Task就從新運行它,通常狀況下應該把JobTracker部署在單獨的機器上。
TaskTracker是運行在多個節點上的slaver服務。TaskTracker主動與JobTracker通訊(與DataNode和NameNode類似,經過心跳來實現)接收做業,並負責直接執行每個任務。
每個Job都會在用戶端經過JobClient類將應用程序以及配置參數Configuration打包成JAR文件存儲在HDFS中,並把路徑提交到JobTracker的master服務,而後由master建立每個Task(即MapTask和ReduceTask)將它們分發到各個TaskTracker服務中去執行。
JobClient提交Job後,JobTracker會建立一個JobInProgress來跟蹤和調度這個Job,並把它添加到Job隊列之中。JobInProgress會根據提交的任務JAR中定義的輸入數據集(已分解成FileSplit)建立對應的一批TaskInProgress用於監控和調度MapTask,同時建立指定書目的TaskInProgress用於監控和調度ReduceTask,默認爲1個ReduceTask。
JobTracker啓動任務時經過每個TaskInProgress來運行Task,這時會把Task對象(即MapTask和ReduceTask)序列化寫入相應的TaskTracker服務中,TaskTracker收到後會建立對應的TaskInProgress(此TaskInProgress實現非JobTracker中使用的TaskInProgress,做用相似)用於監控和調度該Task。啓動具體的Task進程經過TaskInProgress管理,經過TaskRunner對象來運行。TaskRunner會自動裝載任務JAR文件並設置好環境變量後,啓動一個獨立的Java Child進程來執行Task,即MapTask或者ReduceTask,但它們不必定運行在同一個TaskTracker中。
一個完整的Job會自動依次執行Mapper、Combiner(在JobConf指定Combiner時執行)和Reducer,其中Mapper和Combiner是由MapTask調用執行,Reduce則由ReduceTask調用,Combiner實際也是Reducer接口類的實現。Mapper會根據Job JAR中定義的輸入數據集<key1, value1>對讀入,處理完成生成臨時的<key2, value2>對,若是定義了Combiner,MapTask會在Mapper完成調用該Combiner將相同Key的值作合併處理,以減小輸出結果集。MapTask的任務所有完成後,交給ReduceTask進程調用Reducer處理,生成最終結果<Key3, value3>對。
2、MapReduce工做原理
![](http://static.javashuo.com/static/loading.gif)
一、做業的提交
JobClient的submitJob()方法實現的做業提交過程,以下所示:
- 經過JobTracker的getNewJobId()方法,向jobtracker請求一個新的做業ID。參見步驟2。
- 檢查做業的輸出說明,也就是說要指定輸出目錄的路徑,可是輸出目錄還不能存在(防止覆蓋輸出結果),若是不知足條件,就會將錯誤拋給MapReduce程序。
- 檢查做業的輸入說明,也就是說若是輸入路徑不存在,做業也無法提交,若是不知足條件,就會將錯誤拋給MapReduce程序。
- 將做業運行所需的資源,好比做業JAR文件、配置文件等複製到HDFS中。參見步驟3。
- 經過JobTracker的submitJob()方法,告訴jobtracker做業準備執行。參見步驟4。
二、做業的初始化
- JobTracker接收到對其submitJob()方法調用以後,就會把此調用放入一個內部隊列當中,交由做業調度器進行調度。(說明:Hadoop做業的調度器常見的有3個:先進先出調度器;容量調度器;公平調度器。Hadoop做業調度器採用的是插件機制,即做業調度器是動態加載的、可插拔的,同時第三方能夠開發本身的做業調度器,參考資料」大規模分佈式系統架構與設計實戰」)。參見步驟5。
- 初始化包括建立一個表示正在運行做業的對象——封裝任務的記錄信息,以便跟蹤任務的狀態和進程。參見步驟5。
- 接下來要建立運行任務列表,做業調度器首先從共享文件系統中獲取JobClient已計算好的輸入分片信息,而後爲每一個分片建立一個map任務(也就是說mapper的個數與分片的數目相同)。參見步驟6。(建立reduce任務的數量由JobConf的mapred.reduce.task屬性決定,它是用setNumReduceTasks()方法來設置的,而後調度器建立相應數量的要運行的reduce任務,默認狀況只有一個reducer)
三、任務的分配
- tasktracker自己運行一個簡單的循環來按期發送」心跳(heartbeat)」給jobtracker。什麼是心跳呢?就是tasktracker告訴jobtracker它是否還活着,同時心跳也充當二者之間的消息通訊,好比tasktracker會指明它是否已經作好準備來運行新的任務了,若是是,管理者jobtracker就會給執行者tasktracker分配一個任務。參見步驟7。
- 固然,在管理者jobtracker爲執行者tasktracker選擇任務以前,jobtracker必須先選定任務所在的做業。一旦選擇好做業,jobtracker就能夠給tasktracker選定一個任務。如何選擇一個做業呢?固然是Hadoop做業的調度器了,它就像是Hadoop的中樞神經系統同樣,默認的方法是簡單維護一個做業優先級列表。(對於調度算法的更深理解能夠學習操做系統的做業調度算法,進程調度算法,好比先來先服務(FCFS)調度算法,短做業優先(SJF)調度算法,優先級調度算法,高響應比優先調度算法,時間片輪轉調度算法,多級反饋隊列調度算法等。若是從更高的角度來看調度算法,實際上是一種控制和決策的策略選擇。)
四、任務的執行
- 做業選擇好了,任務也選擇好了,接下來要作的事情就是任務的運行了。首先,從HDFS中把做業的JAR文件複製到tasktracker所在的文件系統,同時,tasktracker將應用程序所須要的所有文件從分佈式緩存複製到本地磁盤,也就是從HDFS文件系統複製到ext4等文件系統之中。參見步驟8。
- tasktracker爲任務新建一個本地工做目錄,並把JAR文件中的內容解壓到這個文件夾中,新建一個TaskRunner實例來運行該任務。
- TaskRunner啓動一個新的JVM(參見步驟9)來運行每一個任務(參見步驟10),以便用戶定義的map和reduce函數的任何缺陷都不會影響TaskTracker守護進程(好比致使它崩潰或者掛起)。須要說明一點的是,對於map和reduce任務,tasktracker有固定數量的任務槽,準確數量由tasktracker核的數量和內存大小來決定,好比一個tasktracker可能同時運行兩個map任務和reduce任務。map任務和reduce任務中關於數據本地化部分再也不講解,由於DRCP沒有用到,只要理解本地數據級別就能夠了,好比node-local,rack-local,off-switch。
- 子進程經過umbilical接口與父進程進行通訊,任務的子進程每隔幾秒便告訴父進程它的進度,直到任務完成。
五、進度和狀態的更新
![](http://static.javashuo.com/static/loading.gif)
- MapReduce是Hadoop的一個離線計算框架,運行時間範圍從數秒到數小時,所以,對於咱們而言直到做業進展是很重要的。
- 一個做業和每一個任務都有一個狀態信息,包括做業或任務的運行狀態(好比,運行狀態,成功完成,失敗狀態)、Map和Reduce的進度、計數器值、狀態消息和描述(能夠由用戶代碼來設置)等。
- 這些消息經過必定的時間間隔由Child JVM—>TaskTracker—>JobTracker匯聚。JobTracker將產生一個代表全部運行做業及其任務狀態的全局視圖。能夠經過Web UI查看。同時JobClient經過每秒查詢JobTracker來得到最新狀態,輸出到控制檯上。
- 如今可能會有一個疑問,這些狀態信息在做業執行期間不斷變化,它們是如何與客戶端進行通訊的呢?詳細細節不在講解,參考資料《Hadoop權威指南》。
六、做業的完成
- 當jobtracker收到做業最後一個任務已完成的通知後,便把做業的狀態設置爲」成功」。而後,在JobClient查詢狀態時,便知道做業已成功完成,因而JobClient打印一條消息告知用戶,最後從runJob()方法返回。
說明:
MapReduce容錯,即做業失敗狀況再也不講解,參考資料《Hadoop權威指南》。
3、Shuffle階段和Sort階段
若是說以上是從物理實體的角度來說解MapReduce的工做原理,那麼以上即是從邏輯實體的角度來說解MapReduce的工做原理,以下所示:
- 輸入分片: 在進行map計算以前,mapreduce會根據輸入文件計算輸入分片,每一個輸入分片針對一個map任務,輸入分片存儲的並不是數據自己,而是一個分片長度和一個記錄數據位置的數組,輸入分片每每和hdfs的block關係很密切。假如咱們設定hdfs塊的大小是64MB,若是咱們有三個輸入文件,大小分別是3MB、65MB和127MB,那麼mapreduce會把3MB文件分爲一個輸入分片,65MB則是兩個輸入分片,而127MB也是兩個輸入分片,就會有5個map任務將執行。
- map階段: 就是編寫好的map函數,並且通常map操做都是本地化操做,也就是在數據存儲節點上進行。
- combiner階段: combiner階段是能夠選擇的,combiner本質也是一種reduce操做。Combiner是一個本地化的reduce操做,它是map運算的後續操做,主要是在map計算出中間文件後作一個簡單的合併重複key值的操做,好比,咱們對文件裏的單詞頻率作統計,若是map計算時候碰到一個hadoop單詞就會記錄爲1,這篇文章裏hadoop可能會出現屢次,那麼map輸出文件冗餘就會不少,所以在reduce計算前對相同的key作一個合併操做,文件就會變小,這樣就提升了寬帶的傳輸效率。可是combiner操做是有風險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終結果,好比:若是計算只是求總數,最大值,最小值可使用combiner,可是若是作平均值計算使用combiner,那麼最終的reduce計算結果就會出錯。
- shuffle階段: 將map的輸出做爲reduce輸入的過程就是shuffle。通常mapreduce計算的都是海量數據,map輸出的時候不可能把全部文件都放到內存中進行操做,所以map寫入磁盤的過程十分的複雜,更況且map輸出的時候要對結果進行排序,內存開銷是很大的。map在作輸出的時候會在內存裏開啓一個環形內存緩衝區,這個緩衝區是專門用來輸出的,默認大小是100MB,而且在配置文件裏爲這個緩衝區設定了一個閥值,默認是0.80(這個大小和閥值都是能夠在配置文件裏進行配置的),同時map還會爲輸出操做啓動一個守護線程,若是緩衝區的內存達到了閥值的80%時候,這個守護線程就會把內容寫到磁盤上,這個過程叫spill。另外的20%內存能夠繼續寫入要寫進磁盤的數據,寫出磁盤和寫入內存操做是互不干擾的,若是緩存區被填滿了,那麼map就會阻塞寫入內存的操做,讓寫出磁盤操做完成後再繼續執行寫入內存操做。寫出磁盤前會有個排序操做,這個是在寫出磁盤操做的時候進行的,不是在寫入內存的時候進行的,若是還定義了combiner函數,那麼排序後還會執行combiner操做。每次spill操做也就是寫出磁盤操做的時候就會寫一個溢出文件,即在作map輸出的時候有幾回spill操做就會產生多少個溢出文件。這個過程裏還會有一個partitioner操做,其實partitioner操做和map階段的輸入分片很像,一個partitioner對應一個reduce做業,若是mapreduce操做只有一個reduce操做,那麼partitioner就只有一個。若是有多個reduce操做,那麼partitioner對應的就會有多個。所以,能夠把partitioner看做reduce的輸入分片。到了reduce階段就是合併map輸出文件,partitioner會找到對應的map輸出文件,而後進行復制操做,複製操做時reduce會開啓幾個複製線程,這些線程默認個數是5個(也能夠在配置文件中更改複製線程的個數),這個複製過程和map寫出磁盤的過程相似,也有閥值和內存大小,閥值同樣能夠在配置文件裏配置,而內存大小是直接使用reduce的tasktracker的內存大小,複製的時候reduce還會進行排序操做和合並文件操做,這些操做完畢以後就會進行reduce計算。
- reduce階段: 和map函數同樣,是編寫好的reduce函數,最終結果是存儲在hdfs上的。
參考文獻:緩存
[1] MapReduce編程模型的要點: http://blog.sina.com.cn/s/blog_4a1f59bf0100tgqj.htmlmarkdown
[2] Hadoop權威指南(第三版)數據結構
[3] Hadoop應用開發技術詳解
[4] mapreduce中reducers個數設置: http://www.2cto.com/os/201312/263998.html
[5] 操做系統典型調度算法: http://see.xidian.edu.cn/cpp/html/2595.html
[6] MapReduce框架結構: http://www.cppblog.com/javenstudio/articles/43073.html
[7] MapReduce框架詳解: http://www.cnblogs.com/sharpxiajun/p/3151395.html