一個完整的mapreduce程序在分佈式運行時有三類實例進程:node
一、MRAppMaster:負責整個程序的過程調度及狀態協調緩存
二、mapTask:負責map階段的整個數據處理流程服務器
三、ReduceTask:負責reduce階段的整個數據處理流程併發
一、 一個mr程序啓動的時候,最早啓動的是MRAppMaster,MRAppMaster啓動後根據本次job的描述信息,計算出須要的maptask實例數量,而後向集羣申請機器啓動相應數量的maptask進程負載均衡
二、 maptask進程啓動以後,根據給定的數據切片範圍進行數據處理,主體流程爲:框架
a) 利用客戶指定的inputformat來獲取RecordReader讀取數據,造成輸入KV對jvm
b) 將輸入KV對傳遞給客戶定義的map()方法,作邏輯運算,並將map()方法輸出的KV對收集到緩存分佈式
c) 將緩存中的KV對按照K分區排序後不斷溢寫到磁盤文件oop
三、 MRAppMaster監控到全部maptask進程任務完成以後,會根據客戶指定的參數啓動相應數量的reducetask進程,並告知reducetask進程要處理的數據範圍(數據分區)性能
四、 Reducetask進程啓動以後,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,並在本地進行從新歸併排序,而後按照相同key的KV爲一個組,調用客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,而後調用客戶指定的outputformat將結果數據輸出到外部存儲
maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度
那麼,mapTask並行實例是否越多越好呢?其並行度又是如何決定呢?
一個job的map階段並行度由客戶端在提交job時決定
而客戶端對map階段並行度的規劃的基本邏輯爲:
將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分紅邏輯上的多個split),而後每個split分配一個mapTask並行實例處理
這段邏輯及造成的切片規劃描述文件,由FileInputFormat實現類的getSplits()方法完成,其過程以下圖:
a) 簡單地按照文件的內容長度進行切片
b) 切片大小,默認等於block大小
c) 切片時不考慮數據集總體,而是逐個針對每個文件單獨切片
好比待處理數據有兩個文件:
file1.txt 320M file2.txt 10M |
通過FileInputFormat的切片機制運算後,造成的切片信息以下:
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M |
經過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定
minsize:默認值:1 配置參數: mapreduce.input.fileinputformat.split.minsize |
maxsize:默認值:Long.MAXValue 配置參數:mapreduce.input.fileinputformat.split.maxsize |
blocksize |
所以,默認狀況下,切片大小=blocksize
maxsize(切片最大值):
參數若是調得比blocksize小,則會讓切片變小,並且就等於配置的這個參數的值
minsize (切片最小值):
參數調的比blockSize大,則可讓切片變得比blocksize還大
選擇併發數的影響因素:
一、運算節點的硬件配置
二、運算任務的類型:CPU密集型仍是IO密集型
三、運算任務的數據量
若是硬件配置爲2*12core + 64G,恰當的map並行度是大約每一個節點20-100個map,最好每一個map的執行時間至少一分鐘。
l 若是job的每一個map或者 reduce task的運行時間都只有30-40秒鐘,那麼就減小該job的map或者reduce數,每個task(map|reduce)的setup和加入到調度器中進行調度,這個中間的過程可能都要花費幾秒鐘,因此若是每一個task都很是快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
配置task的JVM重用[dht1] 能夠改善該問題:
(mapred.job.reuse.jvm.num.tasks,默認是1,表示一個JVM上最多能夠順序執行的task
數目(屬於同一個Job)是1。也就是說一個task啓一個JVM)
l 若是input的文件很是的大,好比1TB,能夠考慮將hdfs上的每一個block size設大,好比設成256MB或者512MB
reducetask的並行度一樣影響整個job的執行併發度和執行效率,但與maptask的併發數由切片數決定不一樣,Reducetask數量的決定是能夠直接手動設置:
//默認值是1,手動設置爲4
job.setNumReduceTasks(4);
若是數據分佈不均勻,就有可能在reduce階段產生數據傾斜
注意: reducetask數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有1個reducetask
儘可能不要運行太多的reduce task。對大多數job來講,最好rduce的個數最多和集羣中的reduce持平,或者比集羣的 reduce slots小。這個對於小集羣而言,尤爲重要。
Hadoop的發佈包中內置了一個hadoop-mapreduce-example-2.4.1.jar,這個jar包中有各類MR示例程序,能夠經過如下步驟運行:
啓動hdfs,yarn
而後在集羣中的任意一臺服務器上啓動執行程序(好比運行wordcount):
hadoop jar hadoop-mapreduce-example-2.4.1.jar wordcount /wordcount/data /wordcount/out
JVM重用技術不是指同一Job的兩個或兩個以上的task能夠同時運行於同一JVM上,而是排隊按順序執行。
map和reduce是hadoop的核心功能,hadoop正是經過多個map和reduce的並行運行來實現任務的分佈式並行計算,從這個觀點來看,若是將map和reduce的數量設置爲1,那麼用戶的任務就沒有並行執行,可是map和reduce的數量也不能過多,數量過多雖然能夠提升任務並行度,可是太多的map和reduce也會致使整個hadoop框架由於過分的系統資源開銷而使任務失敗。因此用戶在提交map/reduce做業時應該在一個合理的範圍內,這樣既能夠加強系統負載勻衡,也能夠下降任務失敗的開銷。
1 map的數量
map的數量一般是由hadoop集羣的DFS塊大小肯定的,也就是輸入文件的總塊數,正常的map數量的並行規模大體是每個Node是10~100個,對於CPU消耗較小的做業能夠設置Map數量爲300個左右,可是因爲hadoop的沒一個任務在初始化時須要必定的時間,所以比較合理的狀況是每一個map執行的時間至少超過1分鐘。具體的數據分片是這樣的,InputFormat在默認狀況下會根據hadoop集羣的DFS塊大小進行分片,每個分片會由一個map任務來進行處理,固然用戶仍是能夠經過參數mapred.min.split.size參數在做業提交客戶端進行自定義設置。還有一個重要參數就是mapred.map.tasks,這個參數設置的map數量僅僅是一個提示,只有當InputFormat 決定了map任務的個數比mapred.map.tasks值小時才起做用。一樣,Map任務的個數也能經過使用JobConf 的conf.setNumMapTasks(int num)方法來手動地設置。這個方法可以用來增長map任務的個數,可是不能設定任務的個數小於Hadoop系統經過分割輸入數據獲得的值。固然爲了提升集羣的併發效率,能夠設置一個默認的map數量,當用戶的map數量較小或者比自己自動分割的值還小時可使用一個相對交大的默認值,從而提升總體hadoop集羣的效率。
2 reduece的數量
reduce在運行時每每須要從相關map端複製數據到reduce節點來處理,所以相比於map任務。reduce節點資源是相對比較缺乏的,同時相對運行較慢,正確的reduce任務的個數應該是0.95或者1.75 *(節點數 ×mapred.tasktracker.tasks.maximum參數值)。若是任務數是節點個數的0.95倍,那麼全部的reduce任務可以在 map任務的輸出傳輸結束後同時開始運行。若是任務數是節點個數的1.75倍,那麼高速的節點會在完成他們第一批reduce任務計算以後開始計算第二批 reduce任務,這樣的狀況更有利於負載均衡。同時須要注意增長reduce的數量雖然會增長系統的資源開銷,可是能夠改善負載勻衡,下降任務失敗帶來的負面影響。一樣,Reduce任務也可以與 map任務同樣,經過設定JobConf 的conf.setNumReduceTasks(int num)方法來增長任務個數。
3 reduce數量爲0
有些做業不須要進行歸約進行處理,那麼就能夠設置reduce的數量爲0來進行處理,這種狀況下用戶的做業運行速度相對較高,map的輸出會直接寫入到 SetOutputPath(path)設置的輸出目錄,而不是做爲中間結果寫到本地。同時Hadoop框架在寫入文件系統前並不對之進行排序。
map red.tasktracker.map.tasks.maximum 這個是一個task tracker中可同時執行的map的最大個數,默認值爲2,看《pro hadoop》:it is common to set this value to the effective number of CPUs on the node
把ob分割成map和reduce,合理地選擇Job中 Tasks數的大小能顯著的改善Hadoop執行的性能。增長task的個數會增長系統框架的開銷,但同時也會加強負載均衡並下降任務失敗的開銷。一個極端是1個map、1個reduce的狀況,這樣沒有任務並行。另外一個極端是1,000,000個map、1,000,000個reduce的狀況,會因爲框架的開銷過大而使得系統資源耗盡。
Map任務的數量
Map的數量常常是由輸入數據中的DFS塊的數量來決定的。這還常常會致使用戶經過調整DFS塊大小來調整map的數量。正確的map任務的並行度彷佛應該是10-100 maps/節點,儘管咱們對於處理cpu運算量小的任務曾經把這個數字調正到300maps每節點。Task的初始化會花費一些時間,所以最好控制每一個 map任務的執行超過一分鐘。
實際上控制map任務的個數是很 精妙的。mapred.map.tasks參數對於InputFormat設定map執行的個數來講僅僅是一個提示。InputFormat的行爲應該把輸入數據總的字節值分割成合適數量的片斷。可是默認的狀況是DFS的塊大小會成爲對輸入數據分割片斷大小的上界。一個分割大小的下界能夠經過一個mapred.min.split.size參數來設置。所以,若是你有一個大小是10TB的輸入數據,並設置DFS塊大小爲 128M,你必須設置至少82K個map任務,除非你設置的mapred.map.tasks參數比這個數還要大。最終InputFormat 決定了map任務的個數。
Map任務的個數也能經過使用JobConf 的 conf.setNumMapTasks(int num)方法來手動地設置。這個方法可以用來增長map任務的個數,可是不能設定任務的個數小於Hadoop系統經過分割輸入數據獲得的值。
Reduce任務的個數
正確的reduce任務的 個數應該是0.95或者1.75 ×(節點數 ×mapred.tasktracker.tasks.maximum參數值)。若是任務數是節點個數的0.95倍,那麼全部的reduce任務可以在 map任務的輸出傳輸結束後同時開始運行。若是任務數是節點個數的1.75倍,那麼高速的節點會在完成他們第一批reduce任務計算以後開始計算第二批 reduce任務,這樣的狀況更有利於負載均衡。
目前reduce任務的數量 因爲輸出文件緩衝區大小(io.buffer.size × 2 ×reduce任務個數 << 堆大小),被限制在大約1000個左右。直到可以指定一個固定的上限後,這個問題最終會被解決。
Reduce任務的數量同時也控制着輸出目錄下輸出文件的數量,可是一般狀況下這並不重要,由於下一階段的 map/reduce任務會把他們分割成更加小的片斷。
Reduce任務也可以與 map任務同樣,經過設定JobConf 的conf.setNumReduceTasks(int num)方法來增長任務個數。