hadoop之mapreduce詳解(進階篇)

上篇文章hadoop之mapreduce詳解(基礎篇)咱們瞭解了mapreduce的執行過程和shuffle過程,本篇文章主要從mapreduce的組件和輸入輸出方面進行闡述。html

1、mapreduce做業控制模塊以及其餘功能

mapreduce包括做業控制模塊,編程模型,數據處理引擎。這裏咱們重點闡述做業控制模塊MRAppMaster。前端

1.一、MRAppMaster的構成

MRAppMaster主要有以下幾個組件構成,以下圖所示:java

一、ContainerAllocator:與resourcemanager通訊,爲mapreduce申請資源,做業的所需資源描述爲<priority,hostname,capability,containers,relax_locality>5元組格式,分別表示做業的優先級,指望資源所在的節點,資源量,container數目,是否鬆弛本地性。ContainerAllocator週期性的經過RPC與resourcemanager通訊,而resourcemanager則經過心跳應答的方式爲之返回所需的container列表,完成的container列表等信息。算法

ContainerAllocator工做流程:
步驟1:將Map Task的資源需求發送給RM;
步驟2:若是達到了Reduce Task的調度條件,則開始爲Reduce Task申請資源;
步驟3:若是爲某個Task申請到了資源,則取消其餘重複資源的申請。因爲在HDFS中,任何一份數據一般有三個備份,而對於一個任務而言,考慮到rack和any級別的本地性,它可能會對應7個資源請求
步驟4:若是任務運行失敗,則會從新爲該任務申請資源;
步驟5:若是一個任務運行速度過慢,則會爲其額外申請資源以啓動備份任務(若是啓動了推測執行功能);
步驟6:若是一個節點失敗的任務數目過多,則會撤銷對該節點的全部資源的申請請求sql

二、ClientServer:實現了MRClientprotocol協議,客戶端能夠經過該協議獲取到做業的執行狀態(不須要經過resourcemanager)和控制做業(好比殺死做業,改變做業的優先級等)編程

三、Job:是一個mapreduce做業,負責監控做業的運行狀態,維護一個做業的狀態機,實現異步執行各類做業的相關操做架構

四、Task:一個mapreduce做業中一個任務,負責監控一個任務的運行狀態,維護一個任務的狀態機,實現異步執行各類任務的相關操做。app

五、TaskAttempt:表示一個運行實例負載均衡

六、TaskCleaner:負責清理失敗任務或者殺死任務使用的目錄和產生的臨時結果,維護一個線程池和一個共享隊列,異步刪除任務產生的垃圾數據異步

七、Speculator:完成推測執行功能,當一個任務在執行速度上明顯慢於其餘任務的時候,Speculator將會啓動一個功能相同的任務,先執行完成的任務會kill掉沒執行完的那個做業。

八、ContainerLauncher:負責與NodeManager通訊,以啓動container

九、TaskAttempListener:負責各個任務的心跳信息,若是一個任務一段時間內未彙報心跳信息,則認爲該任務死掉了,會將其從系統中移除。

十、JobHistoryEventHandler:負責各個做業的事件記錄日誌,好比做業的建立,運行等,都會寫入hdfs的指定目錄下,對做業的恢復頗有用。

 

1.二、mapreduce客戶端

     是用戶和yarn通訊的惟一途徑,經過該客戶端,用戶能夠向yarn提交做業,獲取做業的運行狀態,以及控制做業(殺死做業或者任務),該客戶端設計到兩個通訊協議:

ApplicationClientProtocol:resourcemanager實現了該協議,客戶端須要經過該協議提交做業,殺死做業,改變做業的優先級等操做

MRClientProtocol:看成業啓動Application Master後,會啓動MRClientServer服務,該服務實現了MRClientProtocol協議,從而容許用戶直接經過該協議直接與Application Master通訊,獲取做業的執行狀態和控制做業,減輕resourcemanager的壓力。

1.三、MRAppMaster工做流程

做業的運行分爲local模式,yarn的uber模式和yarn的非uber模式

      首先看local模式和yarn模式的選擇:客戶端經過JobClient提交做業時,會經過java標準庫中的Serverloader動態加載全部的ClientProtocolProvider的實現。默認狀況下有兩種實現:LocalClientProcotolProvider和YarnClientProcotolProvider。若是在配置中參數mapreduce.framework.mode設置爲yarn時,客戶端則會採用YarnClientProcotolProvider,建立一個YarnRunner對象做爲真正的客戶端,這樣就能夠經過YarnRunner.submitJob方法提交給yarn做業了。在該方法的內部實現會進一步調用ApplicationClientProcotol的submitApplication方法,提交做業給Resourcemanager。源碼詳情可見:hadoop2.7之做業提交詳解(上)

uber模式和非uber模式的選擇:

uber模式是小做業的一個優化,MrAppMaster不會再爲每個任務申請資源,而是讓其重用一個container,map和reduce會在同一份資源上串行執行。

uber模式條件:

mapreduce.job.ubertask.enable #是否啓用uber模式
mapreduce.job.ubertask.maxmaps #ubertask的最大map數 (默認9)
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce數 (默認1)
mapreduce.job.ubertask.maxbytes #ubertask最大做業大小 (默認爲block.size)
map和reduce使用的資源不得超過MRAppMaster可以使用的資源

知足如上條件則使用yarn的uber模式執行, 不然爲非uber模式執行

在yarn上運行mapreduce做業須要解決兩個問題:
一、reduce做業啥時候啓動比較合適

由參數mapreduce.job.reduce.slowstart.completedmaps控制,表示當Map Task完成的比例達到該值後纔會爲Reduce Task申請資源,默認是0.05;

二、怎樣完成shuffle過程

當用戶向YARN中提交一個MapReduce應用程序後,YARN將分兩個階段運行該應用程序:
第一個階段是由ResourceManager啓動MRAppMaster;
第二個階段是由MRAppMaster建立應用程序,爲它申請資源,並監控它的整個運行過程,直到運行完成。

具體的運行流程圖以下:

步驟1:用戶向YARN中提交應用程序,其中包括MRAppMaster程序、啓動MRAppMaster的命令、用戶程序等;
步驟2:ResourceManager爲該應用程序分配第一個Container,並與對應的NodeManager通訊,要求它在這個Container中啓動應用程序的MRAppMaster;
步驟3:MRAppMaster啓動後,首先向ResourceManager註冊,這樣用戶能夠直接經過ResourceManager查看應用程序的運行狀態,以後,它將爲內部任務申請資源,並監控它們的運行狀態,直到運行結束,即重複步驟4~7;
步驟4:MRAppMaster採用輪詢的方式經過RPC協議向ResourceManager申請和領取資源;
步驟5:一旦MRAppMaster申請到資源後,則與對應的NodeManager通訊,要求它啓動任務;
步驟6:NodeManager爲任務設置好運行環境(包括環境變量、JAR包、二進制程序等)後,將任務啓動命令寫到一個腳本中,並經過運行該腳本啓動任務;
步驟7:各個任務經過RPC協議向MRAppMaster彙報本身的狀態和進度,以讓MRAppMaster隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務;
步驟8:應用程序運行完成後,MRAppMaster向ResourceManager註銷並關閉本身

1.四、推測執行

    在分佈式集羣環境下,因軟件Bug、負載不均衡或者資源分佈不均等緣由,形成同一個做業的多個任務之間運行速度不一致,有的任務運行速度明顯慢於其餘任務(好比某個時刻,一個做業的某個任務進度只有10%,而其餘全部Task已經運行完畢),則這些任務將拖慢做業的總體執行進度。爲了不這種狀況發生,運用推測執行(Speculative Execution)機制,Hadoop會爲該任務啓動一個備份任務,讓該備份任務與原始任務同時處理一份數據,誰先運行完成,則將誰的結果做爲最終結果。

    推測執行算法的核心思想是:某一時刻,判斷一個任務是否拖後腿或者是不是值得爲其啓動備份任務,採用的方法爲,先假設爲其啓動一個備份任務,則可估算出備份任務的完成時間estimatedEndTime2;一樣地,若是按照此刻該任務的計算速度,可估算出該任務最有可能的完成時間estimatedEndTime1,這樣estimatedEndTime1與estimatedEndTime2之差越大,代表爲該任務啓動備份任務的價值越大,則傾向於爲這樣的任務啓動備份任務。

    這種算法的最大優勢是,可最大化備份任務的有效率,其中有效率是有效備份任務數與全部備份任務數的比值,有效備份任務是指完成時間早於原始任務完成時間的備份任務(即帶來實際收益的備份任務)。備份任務的有效率越高、推測執行算法就越優秀,帶來的收益也就越大。

   推測執行機制實際上採用了經典的算法優化方法:以空間換時間,它同時啓動多個相同任務處理同一份數據,並讓這些任務競爭以縮短數據處理時間,顯然這種方法須要佔用更多的計算資源,在集羣資源緊缺的狀況下,應合理使用該機制,爭取在多用少許資源狀況下,減小大做業的計算時間。

參數控制:

mapreduce.map.speculative
mapreduce.reduce.speculative

 2、mapreduce輸入輸出格式

2.一、輸入格式

2.1.一、分片

在FileInputFormat中,分片計算源碼詳見:hadoop2.7做業提交詳解之文件分片

計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize))
minSize的默認值是1,而maxSize的默認值是long類型的最大值,便可得切片的默認大小是blockSize(128M)
maxSize參數若是調得比blocksize小,則會讓切片變小,並且就等於配置的這個參數的值
minSize參數調的比blockSize大,則可讓切片變得比blocksize還大

hadoop爲每一個分片構建一個map任務,能夠並行處理多個分片上的數據,整個數據的處理過程將獲得很好的負載均衡,由於一臺性能較強的計算機能處理更多的數據分片,分片也不能切得過小,不然多個map和reduce間數據的傳輸時間,管理分片,構建多個map任務的時間將決定整個做業的執行時間.(大部分時間都不在計算上)若是文件大小小於128M,則該文件不會被切片,無論文件多小都會是一個單獨的切片,交給一個maptask處理.若是有大量的小文件,將致使產生大量的maptask,大大下降集羣性能.

注意:分片自己不包含數據自己,而是指向數據的引用,存儲位置供mapreduce系統使用以便使得map任務儘可能數據本地化,而分片的大小用來排序,以便優先處理大的分片,
從而作小化做業執行時間。

2.1.二、小文件處理

小文件不只會增長NameNode的存儲壓力,還會增長運行做業時的尋址次數,也會形成map的大批量增長,因此處理小文件是必要的。

一、 在數據處理的前端就將小文件整合成大文件,再上傳到hdfs上,即避免了hdfs不適合存儲小文件的缺點,又避免了後期使用mapreduce處理大量小文件的問題。(最提倡的作法)

二、小文件已經存在hdfs上了,可使用另外一種inputformat來作切片(CombineFileInputFormat),它的切片邏輯和FileInputFormat(默認)不一樣,它能夠將多個小文件在邏輯上規劃到一個切片上,交給一個maptask處理。

2.1.三、如何避免切片

一、動態調整blocksize

二、重寫isSplitable()方法,返回false

2.1.四、文本行整條數據分片存儲

文本行一行記錄是否會被切分存放在兩個分片上,又如何保證數據不丟失和數據不重複。

事實上,Hadoop對這種某一行跨兩個分片的狀況進行了特殊的處理。
一般Hadoop使用的InputSplit是FileSplit,一個FileSplit主要存儲了三個信息<path, start, 分片length>。假設根據設置分片大小爲100,那麼一個250字節大小的文件切分以後,咱們會獲得以下的FileSplit:
<path, 0, 100>
<path, 100, 100>
<path, 200, 50>
(具體的切分算法能夠參考FileInputFormat的實現)

所以,事實上,每一個MapReduce程序獲得的只是相似<path, 0, 100>的信息。當MapReduce程序開始執行時,會根據path構建一個FSDataInputStream,定位到start,而後開始讀取數據。在處理一個FileSplit的最後一行時,當讀取到一個FileSplit的最後一個字符時,若是不是換行符,那麼會繼續讀取下一個FileSplit的內容,直到讀取到下一個FileSplit的第一個換行符。這樣子就保證咱們不會獲得一個不完整的行了。

那麼當MapReduce在處理下一個FileSplit的時候,怎麼知道上一個FileSplit有沒有已經處理了這個FileSplit的第一行內容?
咱們只須要檢查一下前一個FileSplit的最後一個字符是否是換行符,若是是,那麼當前Split的第一行尚未被處理,若是不是,表示當前Split的第一行已經被處理,咱們應該跳過。
在LineRecordReader中,使用了一個很巧妙的方法來實現上述的邏輯,把當前FileSplit的start減一,而後跳過第一行(下面是這個代碼片段)

}else{
if(start!= 0) {
skipFirstLine =true;
--start;
 fileIn.seek(start);
}
in=newLineReader(fileIn, job, recordDelimiter);
 }
if(skipFirstLine) {// skip first line and re-establish "start".
start+=in.readLine(newText(), 0,
(int)Math.min((long)Integer.MAX_VALUE,end-start));
}

2.1.五、經常使用的輸入格式

 

 

 FileInputFormat是全部使用文件做爲其數據源的InputFormat的基類,其提供了兩個功能,一個是指出做業的輸入文件位置,一個是爲輸入文件生成分片的代碼實現。把分片分割成記錄的實現由其子類完成,FileInputFormat層次結構圖以下:

 

 

 

2.二、輸出格式

針對上一節的輸入格式,都會有相對應的輸入格式,OutputFormat層次結構圖以下:

 

 

 經常使用的輸出格式:

 

 

 

更多hadoop生態文章見: hadoop生態系列

參考:

https://blog.csdn.net/appstore81/article/details/15027767
https://www.cnblogs.com/52mm/p/p15.html
https://blog.csdn.net/u011812294/article/details/63262624
https://blog.csdn.net/penggougoude/article/details/82432802#commentBox

《Hadoop權威指南 大數據的存儲與分析 第四版》

《hadoop技術內幕深刻解析yarn架構設計與實現原理》

相關文章
相關標籤/搜索