本篇文章主要從mapreduce運行做業的過程,shuffle,以及mapreduce做業失敗的容錯幾個方面進行詳解。html
MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算。概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想,都是從函數式編程語言裏借來的,還有從矢量編程語言裏借來的特性。它極大地方便了編程人員在不會分佈式並行編程的狀況下,將本身的程序運行在分佈式系統上。 當前的軟件實現是指定一個Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定併發的Reduce(歸約)函數,用來保證全部映射的鍵值對中的每個共享相同的鍵組。 ---來源於百度百科java
MapReduce是一個基於集羣的高性能並行計算平臺(Cluster Infrastructure)
MapReduce是一個並行計算與運行軟件框架(Software Framework)
MapReduce是一個並行程序設計模型與方法(Programming Model & Methodology)node
mapreduce是hadoop中一個批量計算的框架,在整個mapreduce做業的過程當中,包括從數據的輸入,數據的處理,數據的數據輸入這些部分,而其中數據的處理部分就要map,reduce,combiner等操做組成。在一個mapreduce的做業中一定會涉及到以下一些組件:算法
一、客戶端,提交mapreduce做業
二、yarn資源管理器,負責集羣上計算資源的協調
三、yarn節點管理器,負責啓動和監控集羣中機器上的計算容器(container)
四、mapreduce的application master,負責協調運行mapreduce的做業
五、hdfs,分佈式文件系統,負責與其餘實體共享做業文件sql
做業的運行過程主要包括以下幾個步驟:編程
1、做業的提交 2、做業的初始化 3、做業任務的分配 4、做業任務的執行 5、做業執行狀態更新 6、做業完成
具體做業執行過程的流程圖以下圖所示:緩存
做業提交源碼分析詳情見:hadoop2.7之做業提交詳解(上) hadoop2.7之做業提交詳解(下)網絡
在MR的代碼中調用waitForCompletion()方法,裏面封裝了Job.submit()方法,而Job.submit()方法裏面會建立一個JobSubmmiter對象。當咱們在waitForCompletion(true)時,則waitForCompletion方法會每秒輪詢做業的執行進度,若是發現與上次查詢到的狀態有差異,則將詳情打印到控制檯。若是做業執行成功,就顯示做業計數器,不然將致使做業失敗的記錄輸出到控制檯。併發
其中JobSubmmiter實現的大概過程以下:
一、向資源管理器resourcemanager提交申請,用於一個mapreduce做業ID,如圖步驟2所示
二、檢查做業的輸出配置,判斷目錄是否已經存在等信息
三、計算做業的輸入分片的大小
四、將運行做業的jar,配置文件,輸入分片的計算資源複製到一個以做業ID命名的hdfs臨時目錄下,做業jar的複本比較多,默認爲10個(經過參數mapreduce.client.submit.file.replication控制),
五、經過資源管理器的submitApplication方法提交做業app
一、當資源管理器經過方法submitApplication方法被調用後,便將請求傳給了yarn的調度器,而後調度器在一個節點管理器上分配一個容器(container0)用來啓動application master(主類是MRAppMaster)進程。該進程一旦啓動就會向resourcemanager註冊並報告本身的信息,application master而且能夠監控map和reduce的運行狀態。所以application master對做業的初始化是經過建立多個薄記對象以保持對做業進度的跟蹤。
二、application master接收做業提交時的hdfs臨時共享目錄中的資源文件,jar,分片信息,配置信息等。並對每個分片建立一個map對象,以及經過mapreduce.job.reduces參數(做業經過setNumReduceTasks()方法設定)肯定reduce的數量。
三、application master會判斷是否使用uber(做業與application master在同一個jvm運行,也就是maptask和reducetask運行在同一個節點上)模式運行做業,uber模式運行條件:map數量小於10個,1個reduce,且輸入數據小於一個hdfs塊
能夠經過參數:
mapreduce.job.ubertask.enable #是否啓用uber模式
mapreduce.job.ubertask.maxmaps #ubertask的最大map數
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce數
mapreduce.job.ubertask.maxbytes #ubertask最大做業大小
四、application master調用setupJob方法設置OutputCommiter,FileOutputCommiter爲默認值,表示創建作的最終輸出目錄和任務輸出的臨時工做空間
一、在application master判斷做業不符合uber模式的狀況下,那麼application master則會向資源管理器爲map和reduce任務申請資源容器。
二、首先就是爲map任務發出資源申請請求,直到有5%的map任務完成時,纔會爲reduce任務所需資源申請發出請求。
三、在任務的分配過程當中,reduce任務能夠在任何的datanode節點運行,可是map任務執行的時候須要考慮到數據本地化的機制,在給任務指定資源的時候每一個map和reduce默認爲1G內存,能夠經過以下參數配置:
mapreduce.map.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.memory.mb
mapreduce.reduce.cpu.vcores
application master提交申請後,資源管理器爲其按需分配資源,這時,application master就與節點管理器通訊來啓動容器。該任務由主類YarnChild的一個java應用程序執行。在運行任務以前,首先將所需的資源進行本地化,包括做業的配置,jar文件等。接下來就是運行map和reduce任務。YarnChild在單獨的JVM中運行。
每一個做業和它的每一個任務都有一個狀態:做業或者任務的狀態(運行中,成功,失敗等),map和reduce的進度,做業計數器的值,狀態消息或描述看成業處於正在運行中的時候,客戶端能夠直接與application master通訊,每秒(能夠經過參數mapreduce.client.progressmonitor.pollinterval設置)輪詢做業的執行狀態,進度等信息。
當application master收到最後一個任務已完成的通知,便把做業的狀態設置爲成功。
在job輪詢做業狀態時,知道任務已經完成,而後打印消息告知用戶,並從waitForCompletion()方法返回。
看成業完成時,application master和container會清理中間數據結果等臨時問題。OutputCommiter的commitJob()方法被調用,做業信息由做業歷史服務存檔,以便用戶往後查詢。
mapreduce確保每一個reduce的輸入都是按照鍵值排序的,系統執行排序,將map的輸入做爲reduce的輸入過程稱之爲shuffle過程。shuffle也是咱們優化的重點部分。shuffle流程圖以下圖所示:
在生成map以前,會計算文件分片的大小:計算源碼詳見:hadoop2.7做業提交詳解之文件分片
而後會根據分片的大小計算map的個數,對每個分片都會產生一個map做業,或者是一個文件(小於分片大小*1.1)生成一個map做業,而後經過自定的map方法進行自定義的邏輯計算,計算完畢後會寫到本地磁盤。
在這裏不是直接寫入磁盤,爲了保證IO效率,採用了先寫入內存的環形緩衝區,並作一次預排序(快速排序)。緩衝區的大小默認爲100MB(可經過修改配置項mpareduce.task.io.sort.mb進行修改),當寫入內存緩衝區的大小到達必定比例時,默認爲80%(可經過mapreduce.map.sort.spill.percent配置項修改),將啓動一個溢寫線程將內存緩衝區的內容溢寫到磁盤(spill to disk),這個溢寫線程是獨立的,不影響map向緩衝區寫結果的線程,在溢寫到磁盤的過程當中,map繼續輸入到緩衝中,若是期間緩衝區被填滿,則map寫會被阻塞到溢寫磁盤過程完成。溢寫是經過輪詢的方式將緩衝區中的內存寫入到本地mapreduce.cluster.local.dir目錄下。在溢寫到磁盤以前,咱們會知道reduce的數量,而後會根據reduce的數量劃分分區,默認根據hashpartition對溢寫的數據寫入到相對應的分區。在每一個分區中,後臺線程會根據key進行排序,因此溢寫到磁盤的文件是分區且排序的。若是有combiner函數,它在排序後的輸出運行,使得map輸出更緊湊。減小寫到磁盤的數據和傳輸給reduce的數據。
每次環形換衝區的內存達到閾值時,就會溢寫到一個新的文件,所以當一個map溢寫完以後,本地會存在多個分區切排序的文件。在map完成以前會把這些文件合併成一個分區且排序(歸併排序)的文件,能夠經過參數mapreduce.task.io.sort.factor控制每次能夠合併多少個文件。
在map溢寫磁盤的過程當中,對數據進行壓縮能夠提交速度的傳輸,減小磁盤io,減小存儲。默認狀況下不壓縮,使用參數mapreduce.map.output.compress控制,壓縮算法使用mapreduce.map.output.compress.codec參數控制。
map任務完成後,監控做業狀態的application master便知道map的執行狀況,並啓動reduce任務,application master而且知道map輸出和主機之間的對應映射關係,reduce輪詢application master便知道主機所要複製的數據。
一個Map任務的輸出,可能被多個Reduce任務抓取。每一個Reduce任務可能須要多個Map任務的輸出做爲其特殊的輸入文件,而每一個Map任務的完成時間可能不一樣,當有一個Map任務完成時,Reduce任務就開始運行。Reduce任務根據分區號在多個Map輸出中抓取(fetch)對應分區的數據,這個過程也就是Shuffle的copy過程。。reduce有少許的複製線程,所以可以並行的複製map的輸出,默認爲5個線程。能夠經過參數mapreduce.reduce.shuffle.parallelcopies控制。
這個複製過程和map寫入磁盤過程相似,也有閥值和內存大小,閥值同樣能夠在配置文件裏配置,而內存大小是直接使用reduce的tasktracker的內存大小,複製時候reduce還會進行排序操做和合並文件操做。
若是map輸出很小,則會被複制到Reducer所在節點的內存緩衝區,緩衝區的大小能夠經過mapred-site.xml文件中的mapreduce.reduce.shuffle.input.buffer.percent指定。一旦Reducer所在節點的內存緩衝區達到閥值,或者緩衝區中的文件數達到閥值,則合併溢寫到磁盤。
若是map輸出較大,則直接被複制到Reducer所在節點的磁盤中。隨着Reducer所在節點的磁盤中溢寫文件增多,後臺線程會將它們合併爲更大且有序的文件。當完成複製map輸出,進入sort階段。這個階段經過歸併排序逐步將多個map輸出小文件合併成大文件。最後幾個經過歸併合併成的大文件做爲reduce的輸出
當Reducer的輸入文件肯定後,整個Shuffle操做才最終結束。以後就是Reducer的執行了,最後Reducer會把結果存到HDFS上。
在Hadoop集羣環境中,大部分map 任務與reduce任務的執行是在不一樣的節點上。固然不少狀況下Reduce執行時須要跨節點去拉取其它節點上的map任務結果。若是集羣正在運行的job有不少,那麼task的正常執行對集羣內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,咱們不能限制,能作的就是最大化地減小沒必要要的消耗。還有在節點內,相比於內存,磁盤IO對job完成時間的影響也是可觀的。從最基本的要求來講,咱們對Shuffle過程的指望能夠有:
一、完整地從map task端拉取數據到reduce 端。
二、在跨節點拉取數據時,儘量地減小對帶寬的沒必要要消耗。
三、減小磁盤IO對task執行的影響。
在MapReduce計算框架中,主要用到兩種排序算法:快速排序和歸併排序。在Map任務發生了2次排序,Reduce任務發生一次排序:
一、第1次排序發生在Map輸出的內存環形緩衝區,使用快速排序。當緩衝區達到閥值時,在溢寫到磁盤以前,後臺線程會將緩衝區的數據劃分紅相應分區,在每一個分區中按照鍵值進行內排序。
二、第2次排序是在Map任務輸出的磁盤空間上將多個溢寫文件歸併成一個已分區且有序的輸出文件。因爲溢寫文件已經通過一次排序,因此合併溢寫文件時只需一次歸併排序便可使輸出文件總體有序。
三、第3次排序發生在Shuffle階段,將多個複製過來的Map輸出文件進行歸併,一樣通過一次歸併排序便可獲得有序文件。
既然有做業的運行,確定會有做業的失敗,做業的失敗(不考慮硬件,平臺緣由引發的失敗)可能會存在不一樣的問題,以下:
用戶代碼拋出異常(代碼沒寫好):這種狀況任務JVM會在退出以前向application master發送錯誤報告,並記錄進用戶日誌,application master對該做業標記爲failed,並釋放掉佔有的資源容器。
另外一種就是JVM忽然退出,這種狀況節點管理器會注意到進程已經退出,並通知application master將此任務標記爲失敗,若是是由於推測執行而致使任務被終止,則不會被被標記爲失敗。而任務掛起又不一樣,一旦application master注意到有一段時間沒有收到進度更新,便會把任務標記爲失敗,默認爲10分鐘,參數mapreduce.task.timeout控制application master被告知一個任務失敗,將會從新調度該任務執行(會在與以前失敗的不一樣節點上運行),默認重試4次,若是四次都失敗,則做業斷定爲失敗,參數控制爲:
mapreduce.map.maxattempts
mapreduce.reduce.maxattempts
AM也可能因爲各類緣由(如網絡問題或者硬件故障)失效,Yarn一樣會嘗試重啓AM
能夠爲每一個做業單獨配置AM的嘗試重啓次數:mapreduce.am.max-attempts,默認值爲2
Yarn中的上限一塊兒提升:yarn.resourcemanager.am.nax-attempts,默認爲2,單個應用程序不能夠超過這個限制,除非同時修改這兩個參數。
恢復過程:application master向資源管理器發送週期性的心跳。當application master失敗時,資源管理器會檢測到該失敗,並在一個新的容器中啓動application master,並使用做業歷史來恢復失敗的應用程序中的運行任務狀態,使其沒必要從新運行,默認狀況下恢復功能是開啓的,yarn.app.mapreduce.am.job.recovery.enable控制客戶端向application master輪詢做業狀態時,若是application master運行失敗了,則客戶端會向資源管理器resourcemanager詢問和緩存application master地址。
若是節點管理器崩潰或者運行很是緩慢,則就會中止向資源管理器發送心跳信息,若是10分鐘(能夠經過參數yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms設置)資源管理器沒有收到一條心跳信息,則資源管理器將會通知中止發送心跳的節點管理器,並將其從本身的資源池中移除該節點管理器,在該節點上的application master和任務的失敗,都經過如上兩種恢復機制進行恢復。
資源管理器失敗時一個很嚴重的問題,全部的任務將不能被分配資源,做業和容器都沒法啓動,那麼整個經過yarn控制資源的集羣都處於癱瘓狀態。
容錯機制:resourcemanager HA 詳情見:hadoop高可用安裝和原理詳解
更多hadoop生態文章見: hadoop生態系列
參考:
《Hadoop權威指南 大數據的存儲與分析 第四版》