Map Reduce應用包含特定流程:首先編寫map和reduce函數.而後寫一個驅動程序來運行做業.java
Hadoop中的組建是按照Hadoop本身的API配置來進行配置的.一個Configuration類的實例,包括配置屬性及其值的集合.每個屬性都是String類型,值類型多是Java的基本類型或String,Class,java.io.File和String Collection.類型的配置信息並不存儲在xml中.在讀入屬性時,屬性能夠自動識別成一個給定的類,而且get()方法容許一個默認的值,當xml中沒有定義時,系統默認使用默認值.標記爲final的屬性不能被後來定義所覆蓋緩存
GenericOptionParser是一個解釋普通Hadoop命令行選項的類,能夠根據應用須要在Configuration對象中設置,一般不用GenericOptionParser,由於實現Tool接口,用ToolRunner更方便網絡
做業,任務和任務嘗試ID(task attemp id)app
做業ID的格式由job tracker執行的時間和一個有job tracker維護的單獨標識做業job tracker實例遞增的計數器組成.做業編號由1開始jvm
job_200904110811_0002 —— job tracker運行的第二個做業,開始於2009年4月11日8點11分分佈式
任務屬於某個做業,它們的ID即把做業的前綴換成業務的前綴,再加一個後綴來標識業務中的任務(任務ID以0爲起始)函數
task_200904110811_0002_m_0000003 —— ID爲job_200904110811_0002做業中第4個任務oop
任務嘗試在做業執行期間根鬚須要進行分配,若做業在job tracker重啓以後重啓,並恢復運行中的做業,那麼最後的嘗試ID從1000開始計數性能
attemp_200904110811_0002_m_000003_0優化
使用遠程調試器
當一個任務失敗而且沒有足夠記錄來診斷錯誤時,能夠設置運行做業的屬性來指導Hadoop保留做業運行期間產生的全部中間值這些數據能夠單獨在調試器上從新運行那麼出錯的任務.首先將keep.failed.task.files中的配置屬性設置爲true.以便在任務失敗時,taks tracker可以保留足夠多的信息讓任務在相同的輸入數據上從新運行.而後再次運行做業並使用Web用戶界面看任務時在哪一個幾點失敗的(看嘗試任務).接着運行IsolationRunner.用前面保留的文件做爲輸入登陸到任務失敗的節點.尋找任務嘗試目錄.由mappred.local.dir屬性來設置.這個目錄包含多個文件和子目錄.包括job.xml,包含任務嘗試期間生效的全部做業配置屬性.這些屬性將被IsolationRunner用來建立一個JobConf實例.對於map任務,這個目錄還包含了一個含有輸入劃分文件序列話表示的文件.對於reduce任務,則由一個map輸出備份存儲在output目錄中.work目錄時任務嘗試的工做目錄,將其改成以這個目錄運行IsolationRunner以後,設置斷點,鏈接遠程調試.
客戶端:提交Map Reduce做業
job tracker: 協調做業的運行,主類時JobTracker
task tracker: 運行做業劃分後的任務,主類時TaskTracker
HDFS: 在其餘實體間共享做業的文件
JobClient的submitJob()方法所實現的做業提交過程以下:
1.向job tracker請求一個新的做業ID,即JobTracker.getNewJobID()
2.檢查做業的輸出說明,若沒有指定輸出目錄或已存在,做業將不會被提交,並將錯誤返回給MapReduce程序
3.計算做業的輸入劃分,若劃分無計算,做業將不會被提交,並將錯誤返回給Map Reduce程序
4.將運行做業所須要的資源(做業JAR文件,配置文件,計算所得的輸出劃分)複製到一個以做業ID號爲命名的目錄中.jobtracker的文件系統
5.告訴jobtracker做業準備執行.JobTracker.submitJob()
做業的初始化
JobTracker接收到其submitJob()方法的調用後,會把此調用放入一個內部分隊列中,交由做業調度器進行調度,並將其進行初始化.初始化包括建立一個表明該正在運行的做業的對象,它封裝任務和記錄信息,以便跟蹤任務的狀態和進程.要建立運行任務列表.做業調度器先從共享文件系統中獲取JobClient已計算好的輸入劃分信息.而後爲每一個劃分建立一個map任務.建立的reduce任務的數量由JobConf的mappred.reduce.task屬性決定.而後調度器便建立這麼多reduce任務來運行,任務在此指定ID號.
任務分配
TaskTracker執行一個簡單的循環,按期發送心跳方法調用JobTracker.心跳告訴JobTracker, TaskTracker是否還存活.同時也充當二者間之間的消息通道.做爲心跳方法調用過的一部分,TaskTracker會指明他是否已經準備運行新的任務.如果,jobtracker會爲它分配一個任務,並使用心跳方法的返回值與tasktracker進行通訊
在JobTracker爲TaskTracker選擇任務前,jobtracker必須先選定任務所在的做業.默認狀況下,jobtracker維護一個做業優先級別列表.選擇做業後,jobtracker就能夠爲改做業選定一個任務.針對map任務和reduce任務,taskTracker又固定數量的槽,默認調度器在處理reduce任務槽以前會填滿空閒的map任務槽.所以tasktracker至少有一個空閒任務槽.jobtracker會爲它選擇一個map任務,不然選擇一個reduce任務.要選擇一個reduce任務.jobtracker只是簡單地從向未運行到reduce任務列表中選取下一個來執行,並無考慮數據的本地化.然而相對於一個map任務,它考慮的是tasktracker的網絡位置和選取一個距離器輸入劃分文件最近的tasktracker.在理解狀況下,任務是data-local的,與分割文件所在節點運行在相同的節點上.一樣,任務可能也是task-local的,和分隔文件在同一個機架上,但不在同一個節點.
任務的執行
首先,它本地化做業的JAR文件,將它從共享文件系統複製到tasktracker所在的文件系統.同時,將應用程序所須要的所有文件從分佈式緩存複製到本地磁盤,而後爲任務新建一個本地工做目錄,並把JAR文件中的內容解壓到這個文件夾下.新建一個TaskRunner實例來運行任務.TaskRunner啓動一個新的java jvm來運行每一個任務.使得用戶定義的map和reduce函數的任何的任何缺陷都不會影響tasktracker.但在不一樣任務間重用JVM仍是可能的.子進程經過umbilical接口與文件進程進行通訊,它每隔幾秒便告知文件進程它的進度,直到任務完成.
流和管道都運行特殊的map和reduce任務,目的是運行用戶提供的可執行程序,並與之通訊.應用流時,流任務使用標準輸入和輸出流.與進程進行通訊.另外一方面,管道任務則監聽套接字,發送環境中的一個端口給C++進程.Java進程都會把輸入鍵/值對傳給外部進程,後者經過用戶定義的map或reduce函數來執行它並把輸出的鍵/值對傳回給Java進程.
進程和態度的更新
Map Reduce的做業和它的每一個任務都有一個狀態,包括做業或任務的狀態(運行,成功,完成,失敗),map和reduce的進度,做業計數器的值,狀態消息或描述.任務正在運行時,對任務進度保持追蹤.對於map任務,是已處理完成輸入的百分比,對於reduce任務,仍會估計reduce輸入已處理的百分比.整個過程與shuffle的三個結對相對應.
構成Hadoop進行的全部操做以下:讀如一條輸入記錄(在mapper或reducer中);寫入一條輸出記錄(在mapper或reducer中);在一個報告中設置狀態描述(在Reporter()的setStatus()方法);增長一個計數(使用Reporte的incrCounter()方法);調用Reporter的progress()方法
若任務報告了進度,便會設置一個標誌以代表狀態變化將被髮送到tasktracker.在另外一個線程中,每隔三秒檢查次標誌一次.若已設置,則告知task tracker當前任務狀態.同時,task tracker每隔五秒鐘發送心跳到job tracker,而且在此調用中,全部由task tracker運行的任務.它們的狀態都會發送至job tracker.計數器的發送間隔一般大於5秒.job tracker將這些更新合併起來,產生一個全局視圖,代表正在運行的全部做業及其所含任務的狀態.JobClient經過每秒查看jobtracker來接收最新的狀態.客戶端也可使用JobClient的getJob()方法來獲得一個RunningJob的實例,包含全部狀態信息.
做業的完成
job tracker收到做業最後一個任務已經完成的通知後,便將做業的狀態設置成爲「成功」.而後在JobClient查詢狀態時,它將得知任務已經成功完成,便顯示一條消息通知用戶,從runJob()返回.
失敗
任務失敗
子任務失敗最多見的狀況是map或reduce任務中的用戶由於代碼拋出的運行時異常.若發生這種狀況,子任務jvm進程會在退出以前向其task tracker父進程發送錯誤報告.錯誤報告最後被記入用戶日誌.task tracker會將此任務嘗試(task attempt)標記爲failed,釋放一個槽以便運行另外一任務
對於流任務,若流程以非零退出代碼退出運行,會被標記爲failed.這是由stream.non.zero.exit.is.faillure=true決定的
對於jvm忽然退出,可能有JVM錯誤,由MapReduce用戶代碼某些特殊緣由而形成JVM退出.在這種狀況下,task tracker會注意到進程已經退出,並將此嘗試標記爲failed.
對於任務的掛起,task tracker注意到已經有一段時間沒有新的任務,會將任務標記爲failed.在此以後,子JVM進程將被自動殺死.任務失敗的超時時間間隔一般爲10分鐘
job tracker被通知一個任務失敗時,它將從新調度該任務的執行.job tracker會嘗試避免從新調度以前失敗過的task tracker上的任務.此外,若一個任務的失敗次數超過4次,他就不會再被重試.針對map任務,能夠由mapred.map.max.attempts屬性設置.對於reduce任務,則由mapred.reduce.max.attempts屬性設置.同時還能夠經過mapred.max.map.failure.percent和mapred.max.reduce.failure.percent屬性來設置在不觸發任務失敗的狀況下,容許任務失敗的最屢次數.
task tracker失敗
若task tracker因爲崩潰或運行過於緩慢而失敗,它將中止向job tracker發送心跳.job tracker會在接收心跳超時後將它從等待任務調度的tasktracker池中移除.同時會安排此tasktracker上已經運行併成功完成的map任務返回.若tasktracker失敗的次數遠高於集羣平均任務失敗的次數,他就會被放入黑名單.被放入黑名單中的task tracker能夠經過重啓從job tracker的黑名單中被移除
job tracker失敗
job tracker失敗是一個單點故障
做業的調度
能夠同時設置mapred.job.priority屬性或JobClient的setJobPriority()的方法來設置優先級,但在FIFO調度中,優先級並不支持搶佔
Fair Scheduler的目標是讓每一個用戶的平均的共享集羣.若只有一個做業在運行,它獲得整個集羣的全部資源.做業被放入池中.默認狀況下,每一個用戶都有本身的池.提交做業超過其餘用戶的用戶,不會所以而比其餘用戶得到超過平均值的集羣資源.能夠用map和reduce的槽數來定義用戶池的最小容量.也能夠設置每一個池的權重.Fair Scheduler支持搶佔.若一個池在特定的一段時間內未能獲得公平的資源分配.調度器就會終止運行池中的獲得過多資源的任務,以便把槽給資源不足的池
shuffle和排序
MapReduce保證每一個reducer的輸入都已按鍵排序.系統執行排序的過程——map輸出傳到reducer做爲後者的輸入,即成爲shuffle(混洗或洗牌).
map端
map函數利用緩衝的方式寫到內存,並處於效率的緣由預先進行排序.每一個map任務都有一個環形內存緩衝區,任務會把輸出寫到此.默認狀況下,緩衝區的大小爲100mb,此值能夠經過io.sort.mb屬性來修改.當緩衝區內容達到制定大小時(io.sort.spill.percent,默認爲80%),一個後臺線程便開始把內容溢出寫到磁盤中.在線程工做的同時,map輸出繼續被寫到緩衝區,但若此時緩衝區被填滿,map會阻塞直到溢寫過程結束
溢寫將按輪詢方式寫到mapred.local.dir屬性制定的目錄,在一個做業相關子目錄中.在寫到磁盤以前,線程首先根據數據最終被傳送到reducer,將數據劃分紅相應的分區.在每一個分區中,後臺線程按鍵進行內排序(in-memory sort).此時,如有一個combiner,它將基於排序後輸出運行.
一旦內存緩衝區達到溢寫閾值,就會新建一個溢寫文件.所以在map任務寫入其最後一個輸出記錄後,會有若干溢寫文件.在任務完成以前,溢寫文件被合併成一個已分區且已排序的輸出文件.配置屬性io.sort.factor控制着一次最多能合併多少流,默認時10.若已制定combiner,而且溢寫次數至少爲3(min.num.spills..for.combine屬性的值)時,combiner就在輸出文件被寫以前執行.運行combiner的意義在於使map輸出更緊湊,從而只有較少數據被寫到本地磁盤而後傳給reducer.
map輸出被寫到磁盤時默認狀況使不壓縮的.能夠將mapred.compress.map.outout設置爲true,就能夠壓縮輸出.reduer經過HTTP獲得輸出文件的分區.用戶服務於文件分區的工做線程,其次數量由任務的tracker.http.threads屬性來控制.此設置針對的使每一個task tracker,而不是針對每一個map任務槽.默認是40.
reduce端
reduce任務須要爲其特定分區文件從集羣上若干個map任務的map輸出.map任務能夠在不一樣時間完成,所以只要一個任務結束,reduce任務就開始複製其輸出.reduce任務由少許複製線程,所以可以並行地獲取map輸出.默認是5個線程(mapred.reduce.parallel.copied).
map任務完成後,會通知其父tasktracker狀態已更新,而後tasktracker進而通知jobtracker.對於制定的做業,jobtracker直到map輸出和tasktracker之間的映射關係.reducer中的一個線程按期向job tracker獲取map輸出位置.直到獲得全部輸出位置.tasktracker並無在第一個reducer檢索以後就當即從磁盤上刪除map輸出,由於reducer可能失敗.反之,它們會等待,直到被jobtracker告知能夠刪除.
若map的輸出至關小,則會被複制到reduce tasktracker的內存中,不然被複制到磁盤中.內存緩衝區達到閾值大小或達到map輸出閾值時,會被合併,進而被溢寫到磁盤中.隨着磁盤上積累的副本越來越多,後臺線程會將它們合併爲一個更大的,排好序的文件.任何壓縮的map輸出,都必須在內存中被解壓縮,以便合併
全部map輸出被複制期間,reduce任務進入排序階段,這個極端將合併map輸出,維持其按順序排序.最後階段,即reduce階段,合併直接把數據輸入reduce函數.最後的合併便可來自內存,也可來自磁盤.在reduce階段,對已排序輸出中的每一個鍵依次調用reduce函數.此階段的輸出直接寫到輸出文件系統,通常爲HDFS.若採用HDFS,因爲tasktracker節點也運行數據節點,因此第一個塊副本會被寫到本地磁盤.
優化的通常原則是爲shuffle制定儘可能多的內存空間.然而還要確保map和reduce函數可以由足夠的內存來運行.爲map和reduce任務運行的JVM制定的內存代銷由mapred.child.java.opts屬性來設置.應讓任務節點上這個內存大小盡可能大.在map這一端,能夠經過避免屢次磁盤溢寫來獲取最佳性能.若能夠,應增長io.sort.mb的值.MapReduce計數器將計算在做業運行過程當中溢寫到磁盤中的記錄總數.在reduce這一端,當中間值可以所有存放在內存中,就能得到最佳性能.
任務的執行
MapReduce模型將做業分割成任務,而後並行運行任務,使做業的總體執行時間少於順序執行的時間.這使得做業執行時間對運行緩慢的任務很敏感.當一個任務比預期要慢時,Hadoop會進行檢測,啓動另外一個相同的任務做爲備份.這就是所謂的任務的推測式執行.推測式執行的任務只有在一個做業全部任務都啓動以後才啓動,而且推測式執行任務只針對已運行一段時間且比做業中其餘任務平均進度慢的任務.一個任務成功完成後,任何正在運行的副本任務都會被終止.
任務JVM重用
啓用JVM重用後,任務並無同時運行在一個JVM中.JVM順序運行任務.儘管task tracker能夠一次運行多個任務,但都運行在獨立的JVM中.由mapred.job.reuse.jvm.num.tasks指定做業每一個JVM運行的task任務的最大數量.默認爲1.共享JVM的另外一個用處使做業各個任務間的狀態共享,將相關數據存儲到一個靜態的字段後,任務就能夠較快速訪問共享數據
跳過壞記錄
處理錯誤記錄的最佳位置使mapper和reducer代碼.能夠檢測出錯誤記錄並忽略它,或經過拋出一個異常來取消這個做業.還可使用計數器來計算做業中錯誤記錄的總數,從而查看問題所影響的範圍.
極少數狀況下bug存在於第三方的庫中,且沒法在mapper或reducer中修改它.此時能夠用skipping模式選項來自動跳過錯誤記錄.skipping模式啓用後,任務將正在處理的記錄報告個task tracker.任務失敗時,task tracker從新運行該任務,跳過異常任務失敗的記錄.因爲增長網絡流量和錯誤記錄的維護,只有在任務失敗兩次後纔會啓用skipping模式.
默認狀況下,skpping模式使關閉的,可使用SkipBadRedcord類單獨爲map和reduce任務啓動它.每次進行任務嘗試,skipping模式都只能檢測出一個錯誤記錄,所以這種機制僅適用於檢測個別錯誤記錄.Hadoop檢測出來的記錄以序列文件的形式保存在做業輸出目錄中的_logs/skip子目錄下
任務執行環境
確保同一個任務的多個實例不會嘗試向同一個文件進行讀寫操做.須要避免兩個問題.第一個問題是若任務失敗並被重試,那麼在第二個任務運行時原來的輸出部分依舊是存在的,因此應先刪除第一個任務的舊文件.第二個是在啓用推測模式執行的狀況下,同一任務的兩個實例會同時向同一個文件進行寫操做.Hadoop經過將輸出寫到任務嘗試指定的臨時文件,解決了任務的常規輸出問題.這個目錄是{mapred.out put.dir}/_temporart/${mapred.task.id}.若任務執行成功,目錄的內容就被複制到此做業的輸出目錄(${mapred.out put.dir}).所以,若一個任務失敗並重試,第一個任務嘗試的部分輸出就會被清除.一個任務和該任務的推測實例位於不一樣的工做目錄,而且只有先完成的任務纔會把其餘工做目錄中的內容傳到輸出目錄,其餘的都被丟棄
任務完成時提交輸出的方法由一個OutputCommitter來實現,它與OutputFormate相關聯.FileOutputFormat的OutputCommitter是一個FileOutputCommitter,後者實現了前面描述的提交規則.OutputFormat的getOutputCommitter()方法也會被覆蓋以返回一個自定義的OutputCommitter,以避免用不一樣的方式來實現提交過程
一個任務能夠經過檢索其配置文件中mapred.work.output.dir屬性的值來找到它的工做目錄.或使用FileOutputFormat的getWorkOutputPath()靜態方法以獲得表明工做目錄的Path對象.