概述:MapReduce是hadoop的核心組件之一,能夠經過MapReduce很容易在hadoop平臺上進行分佈式的計算編程。本文組織結果以下:首先對MapReduce架構和基本原理進行概述,其次對整個MapReduce過程的生命週期進行詳細討論。 編程
參考文獻:董西城的《Hadoop技術內幕》以及若干論壇文章,沒法考證出處。 數組
MapReduce主要分爲Map和Reduce兩個過程,採用了M/S的設計架構。在1.0系列中,主要角色包括:Client, JobTracke, TaskTracker和Task 架構
Client:用戶須要執行的Job在Client進行配置,例如編寫MapReduce程序,指定輸入輸出路徑,指定壓縮比例等等,配置好後,由客戶端提交給JobTracker。每一個做業Job會被分紅若干個任務Task。 併發
在整個MapReduce執行過程當中,數據須要通過Mapper,中間過程shuffle,Reducer的處理。 app
Mapper主要執行MapTask,將分配給它的split解析爲若干個K-V對做爲map函數的輸入。例如在Wordcount程序中,K=字符串偏移量,V=一行字符串。而後依次調用map()進行處理,輸出仍爲K-V對。此時<K=word,V=1>。 分佈式
中間過程也分爲Map端和Reduce端執行 函數
在mapper端,每一個Map Task都有一個內存緩衝區,存儲着map函數的輸出結果,當緩衝區快滿的時候須要將緩衝區的數據以一個臨時文件的方式存放到磁盤,當整個Map Task結束後再對磁盤中這個Map Task產生的全部臨時文件作合併,生成最終的正式輸出文件,而後等待reduce task來拉取數據。 oop
Partition spa
Partitioner根據map函數輸出的K-V對以及Reduce Task的數量來決定當前這對Map輸出的K-V由哪一個Reduce Task來處理。首先對key進行hash運算,再以Reduce task的數量對它取模;取模是爲了平均Reduce的處理能力,也能夠定製並設置到規定job上。 操作系統
Spill
對於map()函數的輸出數據K-V對,要寫入內存緩衝區,該內存緩衝區的做用是批量收集Map結果,減小IO的影響。再寫入磁盤文件。將K-V對序列化爲字節數組,而後將K-V對以及partition的分組結果寫入緩衝區。該緩衝區是有大小默認限制爲100M,因此當Map任務輸出結果過多時,須要對緩衝進行刷新,將數據寫入磁盤。向磁盤寫數據的過程稱爲Spill,意爲溢寫,由單獨的進程完成,不影響Map結果寫入的線程操做。爲了達到這個目的,經過設置溢寫比例spill.percent(默認0.8)來實現,即當緩衝區中數據達到80%,就啓動Spill進程,鎖定這80%的緩衝,進行溢寫過程;與此同時,其餘線程能夠繼續將Map的輸出結果寫入到剩下20%的緩衝區中,互不影響。
Sort
Spill線程啓動後,須要對將要寫入磁盤的數據進行處理,對已經序列化爲字節的key進行排序。因爲Map任務的結果要交給不一樣Reduce任務來處理,因此須要將交個同一個Reduce任務的數據合併在一塊兒。這個合併過程在寫入緩衝區時並未執行,而是由Spill進程在寫入到磁盤時進行合併。若是有不少的K-V對須要提交給一個Reduce任務,那麼應該將這些K-V進行拼接,減小與partitioner相關的索引記錄。<K=word, V=1,V=1,V=1>
Merge
每次Spill進行溢寫操做,都會在磁盤上產生一個溢寫文件。若是緩衝區不夠大或者Map輸出結果很大,那麼會屢次執行溢寫文件。因此須要將這些溢寫文件歸併爲一個文件,該過程稱爲Merge。Merge所作的操做就是未來自不一樣map task結果中,key相同的K-V歸併爲組,造成K-[v1,v2,v…]。由於是將多個文件合併爲一個文件,因此可能也有相同的Key存在,若是在client端設置過combiner,則會調用他來合併相同的Key。
至此,Map端的工做所有結束,最後這個文件放在TaskTracker可以獲取到的本地目錄內,每一個reduce task不斷經過RPC從JobTracker處獲取map任務是否完成的信息,若是獲知某臺Tasktracker上的map任務完成,則shuffle過程後半段開始啓動。
在Reduce端的中間過程,就是在reduce執行以前所進行的工做,不斷將各個map輸出的最終結果進行拉取,而後進行merge操做。
Copy
簡單地拉取數據。Reduce進程啓動一些數據copy線程(Fetcher),經過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。由於map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中。
Merge
這裏的merge如map端的merge動做,只是數組中存放的是不一樣map端copy來的數值。Copy過來的數據會先放入內存緩衝區中,這裏的緩衝區大小要比map端的更爲靈活,它基於JVM的heap size設置,由於Shuffle階段Reducer不運行,因此應該把絕大部分的內存都給Shuffle用。這裏須要強調的是,merge有三種形式:1)內存到內存 2)內存到磁盤 3)磁盤到磁盤。默認狀況下第一種形式不啓用。一樣的要在內存進行sort操做。當內存中的數據量到達必定閾值,就啓動內存到磁盤的merge。與map 端相似,這也是溢寫的過程,這個過程當中若是設置有Combiner,也是會啓用的,而後在磁盤中生成了衆多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,而後啓動第三種磁盤到磁盤的merge方式生成最終的那個文件。
Reduce Task 讀取中間過程shuffle產生並放在HDFS上的最終文件,不斷地調用reduce()函數來處理輸入數據,該輸入數據格式爲<K=word,V=n,V=m…>,最後輸出到HDFS上。
本節對MapReduce做業的處理過程從提交到完成進行概述。做業處理的整個流程包括:
做業提交與初始化→任務調度與監控→運行環境準備→任務的執行→任務結束
用戶須要執行的做業在客戶端進行配置,例如編寫MapReduce程序,指定輸入輸出路徑,指定壓縮比例等等,配置好後,由客戶端提交給JobTracker。用戶提交做業後,首先由JobClient 實例將做業相關信息(好比將程序jar 包、做業配置文件、分片元信息文件等)上傳到HDFS上,其中,分片元信息文件記錄了每一個輸入分片的邏輯位置信息。而後JobClient經過RPC 通知JobTracker。JobTracker 收到新做業提交請求後,由做業調度模塊對做業進行初始化:爲做業建立一個JobInProgress 對象以跟蹤做業運行情況,而JobInProgress 則會爲每一個Task 建立一個TaskInProgress 對象以跟蹤每一個任務的運行狀態,TaskInProgress 可能須要管理多個"Task Attempt"
TaskTracker 週期性地經過Heartbeat 向JobTracker 彙報本節點的資源使用狀況,一旦出現空閒資源,JobTracker 會按照必定的策略選擇一個合適的任務使用該空閒資源,這由任務調度器完成。任務調度器是一個可插拔的獨立模塊,且爲雙層架構,即首先選擇做業,而後從該做業中選擇任務,其中,選擇任務時須要重點考慮數據本地性。此外,JobTracker 跟蹤做業的整個運行過程,併爲做業的成功運行提供全方位的保障。首先,當TaskTracker 或者Task 失敗時,轉移計算任務;其次,當某個Task 執行進度遠落後於同一做業的其餘Task 時,爲之啓動一個相同Task,並選取計算快的Task 結果做爲最終結果。
運行環境準備包括JVM 啓動和資源隔離, 均由TaskTracker 實現。TaskTracker 爲每一個Task 啓動一個獨立的JVM 以免不一樣Task 在運行過程當中相互影響;同時,TaskTracker 使用了操做系統進程實現資源隔離以防止Task 濫用資源。
TaskTracker 爲Task 準備好運行環境後,便會啓動Task。在運行過程當中,每一個Task 的最新進度首先由Task 經過RPC 彙報給TaskTracker,再由TaskTracker彙報給JobTracker。
JobTracker在接受到最後一個任務運行完成後,會將任務標誌爲成功。此時會作刪除中間結果等善後處理工做。
本文簡單討論總結了MapReduce的架構和做業的生命週期,若是有錯誤之處,還望指正。