Mapreduce是一個分佈式運算程序的編程框架,是用戶開發「基於hadoop的數據分析應用」的核心框架。java
Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在一個hadoop集羣上。算法
設想一個海量數據場景下的wordcount需求:編程
單機版:內存受限,磁盤受限,運算能力受限
分佈式:
一、文件分佈式存儲(HDFS)
二、運算邏輯須要至少分紅2個階段(一個階段獨立併發,一個階段匯聚)
三、運算程序如何分發
四、程序如何分配運算任務(切片)
五、兩階段的程序如何啓動?如何協調?
六、整個程序運行過程當中的監控?容錯?重試?
可見在程序由單機版擴成分佈式時,會引入大量的複雜工做。爲了提升開發效率,能夠將分佈式程序中的公共功能封裝成框架,讓開發人員能夠將精力集中於業務邏輯。緩存
首先要說明的是Hadoop2.0以前和Hadoop2.0以後的區別:網絡
從圖上的user program開始,user program連接了MapReduce庫,實現了最基本的Map函數和Reduce函數。併發
一、MapReduce庫把的輸入文件劃分爲M份,即如圖左所示分紅了split0-4的分片,而後使用fork將用戶進程copy到集羣內其它機器上。 二、被分配了Map做業的Worker,開始讀取對應分片的輸入數據,Map做業從輸入數據中抽取出鍵值對,map()函數產生的中間鍵值對被緩存在內存中。 三、緩存的中間鍵值對會按期寫入本地磁盤,這些中間鍵值對的位置會被通報給Master,Master負責將信息轉發給Reduce Worker。 四、Reduce Worker將分配好的Reduce做業的中間鍵值對讀取後,並對它們進行排序,並將相同key的鍵值對彙集在一塊兒。 五、Reduce Worker遍歷排序好的鍵值並傳遞給reduce() 函數,經reduce() 函數計算後產生的輸出會添加到這個分區的輸出文件中。
一個Map/Reduce做業的輸入和輸出類型以下所示:app
(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
一、Input階段 數據以必定的格式傳遞給Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可使用,在Job.setInputFormat能夠設置,也能夠自定義分片函數。 二、map階段 對輸入的(key,value)進行處理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass進行設置。 三、Sort階段 對於Mapper的輸出進行排序,使用Job.setOutputKeyComparatorClass進行設置,而後定義排序規則。 四、Combine階段 這個階段對於Sort以後,對相同key的結果進行合併,使用Job.setCombinerClass進行設置,也能夠自定義Combine Class類。 五、Partition階段 將Mapper的中間結果按照key的範圍劃分爲R份(Reduce做業的個數),默認使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也能夠自定義劃分的函數,使用Job.setPartitionClass設置。 六、Reduce階段 對於Mapper階段的結果進行進一步處理,Job.setReducerClass進行設置自定義的Reduce類。 七、Output階段 Reducer輸出數據的格式。
一、結構 一個完整的MapReduce程序是這樣一個分佈式程序的通用框架,其應對以上問題的總體結構以下:框架
二、MapReduce運行流程解析
1) 一個MapReduce程序啓動的時候,最早啓動的是MRAppMaster,MRAppMaster啓動後根據本次job的描述信息,計算出須要的MapTask實例數量,而後向集羣申請機器啓動相應數量的MapTask進程。
2)MapTask進程啓動以後,根據給定的數據切片範圍進行數據處理,主體流程爲:分佈式
3) MRAppMaster監控到全部MapTask進程任務完成以後,會根據客戶指定的參數啓動相應數量的ReduceTask進程,並告知ReduceTask進程要處理的數據範圍(數據分區)
4)ReduceTask進程啓動以後,根據MRAppMaster告知的待處理數據所在位置,從若干臺MapTask運行所在機器上獲取到若干個MapTask輸出結果文件,並在本地進行從新歸併排序,而後按照相同key的KV爲一個組,調用客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,而後調用客戶指定的outputformat將結果數據輸出到外部存儲。函數
三、MapTask並行度決定機制
MapTask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,MapTask並行實例是否越多越好呢?其並行度又是如何決定呢?
3.一、MapTask並行度的決定機制
一個job的map階段並行度由客戶端在提交job時決定,而客戶端對map階段並行度的規劃的基本邏輯爲: 將待處理數據執行邏輯切片,而後每個split分配一個mapTask並行實例處理,這段邏輯及造成的切片規劃描述文件,由FileInputFormat實現類的getSplits()方法完成
3.二、ReduceTask並行度的決定
ReduceTask的並行度一樣影響整個job的執行併發度和執行效率,但與MapTask的併發數由切片數決定不一樣,ReduceTask數量的決定是能夠直接手動設置:
//默認值是1
手動設置爲4 job.setNumReduceTasks(4);
若是數據分佈不均勻,就有可能在reduce階段產生數據傾斜。
(注意: ReduceTask數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有1個reducetask 。儘可能不要運行太多的ReduceTask,對大多數job來講,最好reduce的個數最多和集羣中的reduce持平,或者比集羣的reduce slots小)
3.三、mapreduce的shuffle機制
1)概述 MapReduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle。 shuffle的核心機制:數據分區,排序,緩存。具體來講:就是將maptask輸出的處理結果數據,分發給reducetask,並在分發的過程當中,對數據按key進行了分區和排序。
分區partition(肯定哪一個數據進入哪一個reduce)
Sort根據key排序
Combiner進行局部value的合併
2)詳細流程
一、 MapReduce收集咱們的map()方法輸出的kv對,放到內存緩衝區中 二、 從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件 三、 多個溢出文件會被合併成大的溢出文件 四、 在溢出過程當中,及合併的過程當中,都要調用partitoner進行分組和針對key進行排序 五、 reducetask根據本身的分區號,去各個maptask機器上取相應的結果分區數據 六、 reducetask會取到同一個分區的來自不一樣maptask的結果文件,reducetask會將 這些文件再進行合併(歸併排序) 七、 合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快 。緩衝區的大小能夠經過參數調整, 參數:io.sort.mb 默認100M。
一個mapreduce做業的執行流程是:做業提交->做業初始化->任務分配->任務執行->更新任務執行進度和狀態->做業完成。
一個完整的mapreduce做業流程,包括4個獨立的實體:
客戶端:client,編寫mapreduce程序,配置做業,提交做業。
JobTracker:協調這個做業的運行,分配做業,初始化做業,與
TaskTracker進行通訊。
TaskTracker:負責運行做業,保持與JobTracker進行通訊。
HDFS:分佈式文件系統,保持做業的數據和結果。
一、提交做業
JobClient使用runjob方法建立一個JobClient實例,而後調用submitJob()方法進行做業的提交,提交做業的具體過程以下:
1) 經過調用JobTracker對象的getNewJobId()方法從JobTracker處得到一個做業ID。 2) 檢查做業的相關路徑。若是輸出路徑存在,做業將不會被提交(保護上一個做業運行結果)。 3) 計算做業的輸入分片,若是沒法計算,例如輸入路徑不存在,做業將不被提交,錯誤返回給mapreduce程序。 4) 將運行做業所需資源(做業jar文件,配置文件和計算獲得的分片)複製到HDFS上。 5) 告知JobTracker做業準備執行(使用JobTracker對象的submitJob()方法來真正提交做業)。
二、做業初始化
三、任務的分配
TaskTracker和JobTracker之間的通訊和任務分配是經過心跳機制完成的。TaskTracker做爲一個單獨的JVM,它執行一個簡單的循環,主要實現每隔一段時間向JobTracker發送心跳,告訴JobTracker此TaskTracker是否存活,是否準備執行新的任務。若是有待分配的任務,它就會爲TaskTracker分配一個任務。
四、任務的執行
五、更新任務的執行進度和狀態
六、任務完成 當Job完成後,JobTracker會收一個Job Complete的通知,並將當前的Job狀態更新爲successful。同時JobClient也會輪循獲知提交的Job已經完成,將信息顯示給用戶。最後,JobTracker會清理和回收該Job的相關資源,並通知TaskTracker進行相同的操做(好比刪除中間結果文件)