(二)Hadoop之MapReduce原理分析

簡介

Mapreduce是一個分佈式運算程序的編程框架,是用戶開發「基於hadoop的數據分析應用」的核心框架。java

Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在一個hadoop集羣上。算法

爲何要MapReduce

  1. 海量數據在單機上處理由於硬件資源限制,沒法勝任
  2. 而一旦將單機版程序擴展到集羣來分佈式運行,將極大增長程序的複雜度和開發難度
  3. 引入MapReduce框架後,開發人員能夠將絕大部分工做集中在業務邏輯的開發上,而將分佈式計算中的複雜性交由框架來處理

設想一個海量數據場景下的wordcount需求:編程

單機版:內存受限,磁盤受限,運算能力受限
分佈式:
一、文件分佈式存儲(HDFS)
二、運算邏輯須要至少分紅2個階段(一個階段獨立併發,一個階段匯聚)
三、運算程序如何分發
四、程序如何分配運算任務(切片)
五、兩階段的程序如何啓動?如何協調?
六、整個程序運行過程當中的監控?容錯?重試?

可見在程序由單機版擴成分佈式時,會引入大量的複雜工做。爲了提升開發效率,能夠將分佈式程序中的公共功能封裝成框架,讓開發人員能夠將精力集中於業務邏輯。緩存

MapReduce並行處理的基本過程

首先要說明的是Hadoop2.0以前和Hadoop2.0以後的區別:網絡

  • 2.0以前只有MapReduce的運行框架,它裏面只有兩種節點,一是master,二是worker。master既作資源調度又作程序調度,worker只是用來參與計算的。
  • 但在2.0以後加入了Yarn集羣,Yarn集羣的主節點承擔了資源調度,Yarn集羣的從節點中會挑選出一個節點(由RedourceManager決定)來進行應用程序的資源調度,做用相似於2.0以前的master的工做。(資源調度: 處理程序所須要的cpu、內存資源,以及存儲數據所須要的硬盤資源)

從圖上的user program開始,user program連接了MapReduce庫,實現了最基本的Map函數和Reduce函數。
image.png併發

一、MapReduce庫把的輸入文件劃分爲M份,即如圖左所示分紅了split0-4的分片,而後使用fork將用戶進程copy到集羣內其它機器上。
二、被分配了Map做業的Worker,開始讀取對應分片的輸入數據,Map做業從輸入數據中抽取出鍵值對,map()函數產生的中間鍵值對被緩存在內存中。
三、緩存的中間鍵值對會按期寫入本地磁盤,這些中間鍵值對的位置會被通報給Master,Master負責將信息轉發給Reduce Worker。
四、Reduce Worker將分配好的Reduce做業的中間鍵值對讀取後,並對它們進行排序,並將相同key的鍵值對彙集在一塊兒。
五、Reduce Worker遍歷排序好的鍵值並傳遞給reduce() 函數,經reduce() 函數計算後產生的輸出會添加到這個分區的輸出文件中。

MapRrduce輸入與輸出問題

image.png

  • Map/Reduce框架運轉在<key, value>鍵值對上,也就是說,框架把做業的輸入看爲是一組<key, value>鍵值對,一樣也產出一組 <key, value>鍵值對作爲做業的輸出,這兩組鍵值對的類型可能不一樣。
  • 框架須要對key和value的類進行序列化,所以這些類都須要實現Writable接口(Writable接口是一個實現列化協議的序列化對象,序列化最主要的做用就是持久化存儲或者是用於網絡傳輸)。另外,爲了方便框架執行排序操做,key類必須實現WritableComparable接口。

一個Map/Reduce做業的輸入和輸出類型以下所示:app

(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

MapReduce實際處理流程

  • MapReduce 能夠當作是分治算法的一種體現。所謂分治算法就是「就是分而治之 ,將大的問題分解爲相同類型的子問題(最好具備相同的規模),對子問題進行求解,而後合併成大問題的解。
  • MapReduce 就是分治法的一種,將輸入進行分片,而後交給不一樣的task進行處理,而後合併成最終的解。
  • MapReduce 實際的處理過程能夠理解爲Input->Map->Sort->Combine->Partition->Reduce->Output。
一、Input階段
數據以必定的格式傳遞給Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可使用,在Job.setInputFormat能夠設置,也能夠自定義分片函數。
二、map階段
對輸入的(key,value)進行處理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass進行設置。
三、Sort階段
對於Mapper的輸出進行排序,使用Job.setOutputKeyComparatorClass進行設置,而後定義排序規則。
四、Combine階段
這個階段對於Sort以後,對相同key的結果進行合併,使用Job.setCombinerClass進行設置,也能夠自定義Combine Class類。
五、Partition階段
將Mapper的中間結果按照key的範圍劃分爲R份(Reduce做業的個數),默認使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也能夠自定義劃分的函數,使用Job.setPartitionClass設置。
六、Reduce階段
對於Mapper階段的結果進行進一步處理,Job.setReducerClass進行設置自定義的Reduce類。
七、Output階段
Reducer輸出數據的格式。

MapReduce框架結構及核心運行機制

一、結構 一個完整的MapReduce程序是這樣一個分佈式程序的通用框架,其應對以上問題的總體結構以下:框架

  • MRAppMaster:負責整個程序的過程調度及狀態協調(Hadoop2.0以後就不同了)
  • MapTask:負責map階段的整個數據處理流程
  • ReduceTask:負責reduce階段的整個數據處理流程

二、MapReduce運行流程解析
1) 一個MapReduce程序啓動的時候,最早啓動的是MRAppMaster,MRAppMaster啓動後根據本次job的描述信息,計算出須要的MapTask實例數量,而後向集羣申請機器啓動相應數量的MapTask進程。
2)MapTask進程啓動以後,根據給定的數據切片範圍進行數據處理,主體流程爲:分佈式

  • 利用客戶指定的inputformat來獲取RecordReader讀取數據,造成輸入KV對
  • 將輸入KV對傳遞給客戶定義的map()方法,作邏輯運算,並將map()方法輸出的KV對收集到緩存
  • 將緩存中的KV對按照Key分區排序後不斷溢寫到磁盤文件

3) MRAppMaster監控到全部MapTask進程任務完成以後,會根據客戶指定的參數啓動相應數量的ReduceTask進程,並告知ReduceTask進程要處理的數據範圍(數據分區)
4)ReduceTask進程啓動以後,根據MRAppMaster告知的待處理數據所在位置,從若干臺MapTask運行所在機器上獲取到若干個MapTask輸出結果文件,並在本地進行從新歸併排序,而後按照相同key的KV爲一個組,調用客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,而後調用客戶指定的outputformat將結果數據輸出到外部存儲。函數

三、MapTask並行度決定機制
MapTask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,MapTask並行實例是否越多越好呢?其並行度又是如何決定呢?

3.一、MapTask並行度的決定機制
一個job的map階段並行度由客戶端在提交job時決定,而客戶端對map階段並行度的規劃的基本邏輯爲: 將待處理數據執行邏輯切片,而後每個split分配一個mapTask並行實例處理,這段邏輯及造成的切片規劃描述文件,由FileInputFormat實現類的getSplits()方法完成

3.二、ReduceTask並行度的決定
ReduceTask的並行度一樣影響整個job的執行併發度和執行效率,但與MapTask的併發數由切片數決定不一樣,ReduceTask數量的決定是能夠直接手動設置:

//默認值是1
手動設置爲4 job.setNumReduceTasks(4);

若是數據分佈不均勻,就有可能在reduce階段產生數據傾斜。
(注意: ReduceTask數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有1個reducetask 。儘可能不要運行太多的ReduceTask,對大多數job來講,最好reduce的個數最多和集羣中的reduce持平,或者比集羣的reduce slots小)

3.三、mapreduce的shuffle機制
image.png

1)概述 MapReduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle。 shuffle的核心機制:數據分區,排序,緩存。具體來講:就是將maptask輸出的處理結果數據,分發給reducetask,並在分發的過程當中,對數據按key進行了分區和排序

分區partition(肯定哪一個數據進入哪一個reduce)
Sort根據key排序
Combiner進行局部value的合併

2)詳細流程

一、 MapReduce收集咱們的map()方法輸出的kv對,放到內存緩衝區中 二、 從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件 
三、 多個溢出文件會被合併成大的溢出文件 
四、 在溢出過程當中,及合併的過程當中,都要調用partitoner進行分組和針對key進行排序 
五、 reducetask根據本身的分區號,去各個maptask機器上取相應的結果分區數據 
六、 reducetask會取到同一個分區的來自不一樣maptask的結果文件,reducetask會將 這些文件再進行合併(歸併排序) 
七、 合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)

Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快 。緩衝區的大小能夠經過參數調整, 參數:io.sort.mb 默認100M。

一個job的運行流程

一個mapreduce做業的執行流程是:做業提交->做業初始化->任務分配->任務執行->更新任務執行進度和狀態->做業完成。

一個完整的mapreduce做業流程,包括4個獨立的實體:

客戶端:client,編寫mapreduce程序,配置做業,提交做業。
JobTracker:協調這個做業的運行,分配做業,初始化做業,與
TaskTracker進行通訊。
TaskTracker:負責運行做業,保持與JobTracker進行通訊。
HDFS:分佈式文件系統,保持做業的數據和結果。

image.png

一、提交做業
JobClient使用runjob方法建立一個JobClient實例,而後調用submitJob()方法進行做業的提交,提交做業的具體過程以下:

1) 經過調用JobTracker對象的getNewJobId()方法從JobTracker處得到一個做業ID。 
2) 檢查做業的相關路徑。若是輸出路徑存在,做業將不會被提交(保護上一個做業運行結果)。 
3) 計算做業的輸入分片,若是沒法計算,例如輸入路徑不存在,做業將不被提交,錯誤返回給mapreduce程序。 
4) 將運行做業所需資源(做業jar文件,配置文件和計算獲得的分片)複製到HDFS上。 
5) 告知JobTracker做業準備執行(使用JobTracker對象的submitJob()方法來真正提交做業)。

二、做業初始化

  • 當JobTracker收到Job提交的請求後,將Job保存在一個內部隊列,並讓Job Scheduler(做業調度器)處理並初始化。
  • 初始化涉及到建立一個封裝了其tasks的job對象,並保持對task的狀態和進度的跟蹤(步驟5)。
  • 當建立要運行的一系列task對象後,Job Scheduler首先開始從文件系統中獲取由JobClient計算的input splits(步驟6),而後再爲每一個split建立map task。

三、任務的分配
TaskTracker和JobTracker之間的通訊和任務分配是經過心跳機制完成的。TaskTracker做爲一個單獨的JVM,它執行一個簡單的循環,主要實現每隔一段時間向JobTracker發送心跳,告訴JobTracker此TaskTracker是否存活,是否準備執行新的任務。若是有待分配的任務,它就會爲TaskTracker分配一個任務。

四、任務的執行

  • TaskTracker申請到新的任務以後,就要在本地運行了。首先,是將任務本地化(包括運行任務所需的數據、配置信息、代碼等),即從HDFS複製到本地,調用localizeJob()完成的。
  • 對於使用Streaming和Pipes建立Map或者Reduce程序的任務,Java會把key/value傳遞給外部進程,而後經過用戶自定義的Map或者Reduce進行處理,而後把key/value傳回到java中。其中就好像是TaskTracker的子進程在處理Map和Reduce代碼同樣。

五、更新任務的執行進度和狀態

  • 進度和狀態是經過heartbeat(心跳機制)來更新和維護的。
  • 對於Map Task,進度就是已處理數據和全部輸入數據的比例。
  • 對於Reduce Task,狀況就有點複雜,包括3部分,拷貝中間結果文件、排序、reduce調用,每部分佔1/3。

六、任務完成 當Job完成後,JobTracker會收一個Job Complete的通知,並將當前的Job狀態更新爲successful。同時JobClient也會輪循獲知提交的Job已經完成,將信息顯示給用戶。最後,JobTracker會清理和回收該Job的相關資源,並通知TaskTracker進行相同的操做(好比刪除中間結果文件)

相關文章
相關標籤/搜索