MapReduce是一種可用於數據處理的編程模型(或計算模型),該模型能夠比較簡單,但想寫出有用的程序卻不太容易。MapReduce能將大型數據處理任務分解成不少單個的、能夠在服務器集羣中並行執行的任務,而這些任務的計算結果能夠合併在一塊兒計算最終的結果。最重要的是,MapReduce的優點在於易於編程且能在大型集羣(上千節點)並行處理大規模數據集,以可靠,容錯的方式部署在商用機器上。html
從MapReduce的全部長處來看,它基本上是一個批處理系統,並不適合交互式分析。不可能執行一條查詢並在幾秒內或更短的時間內獲得結果。典型狀況下,執行查詢須要幾分鐘或更多時間。所以,MapReduce更適合那種沒有用戶在現場等待查詢結果的離線使用場景。算法
在MapReduce整個過程能夠歸納爲如下過程:編程
input split --> map --> shuffle --> reduce --> output數組
下圖是《Hadoop權威指南》給出的MapReduce運行過程:緩存
MapReduce運行過程圖 服務器
MapReduce做業是客戶端須要執行的一個工做單元:它包括輸入數據、MapReduce程序和配置信息。Hadoop將做業分紅若干個任務(task)來執行,其中包括兩類任務:map任務和reduce任務。這些任務運行在集羣的節點上,並經過YARN進行調度。若是一個任務失敗,它將在另外一個不一樣的節點上自動從新調度運行。網絡
Hadoop將MapReduce的輸入數據劃分紅等長的小數據塊,稱爲輸入分片(input split)或簡稱「分片」。Hadoop爲每個分片構建一個map任務,並由該任務來運行用戶自定義的map函數從而處理分片中的每條記錄。數據結構
擁有許多分片,意味着處理每一個分片所須要的時間少於處理整個輸入數據所花的時間。所以,若是咱們並行處理每一個分片,且每一個分片數據比較小,那麼整個處理過程將得到更好的負載平衡,由於一臺較快的計算機可以處理的數據分片比一臺較慢的計算機更多,且成必定的比例。即便使用相同的機器,失敗的進程或其餘併發運行的做業可以實現滿意的負載平衡,而且隨着分片被切分得更細,負載平衡的質量會更高。另外一方面,若是分片切分得過小,那麼管理分片得總時間和構建map任務得總時間將決定做業的整個執行時間。對於大多數做業來講,一個合理的分片大小趨向於HDFS的一個塊的大小,這樣能夠確保存儲在單個節點上的最大輸入塊的大小。數據塊默認是128MB,不過能夠針對集羣調整這個默認值,或在每一個文件建立時指定。併發
map任務會將集合中的元素從一種形式轉化成另外一種形式,在這種狀況下,輸入的鍵值對會被轉換成零到多個鍵值對輸出。其中輸入和輸出的鍵必須徹底不一樣,輸入和輸出的值則可能徹底不一樣。app
Hadoop在存儲有輸入數據(HDFS中的數據)的節點上運行map任務,能夠得到最佳性能,由於它無需使用寶貴的集羣帶寬資源。這就是所謂的「數據本地化優化」。可是,有時對於一個map任務的輸入分片來講,存儲該分片的HDFS數據塊複本的全部節點可能正在運行其餘map任務,此時做業調度須要從某一數據塊所在的機架中的一個節點上尋找一個空閒的map槽(slot)來運行該map任務分片。僅僅在很是偶然的狀況下(該狀況基本上不會發生),會使用其餘機架中的節點運行該map任務,這將致使機架與機架之間的網絡傳輸。下圖顯示了這三種可能性。
map任務的網絡傳輸的三種可能性圖
map任務的輸出被稱爲中間鍵和中間值,會被髮送到reducer作後續處理。但輸出結果只寫入本地硬盤,而非HDFS。這是爲何?由於map的輸出是中間結果:該中間結果由reduce任務處理後才產生最終輸出結果,並且一旦做業完成,map的輸出結果就能夠刪除。所以,若是把它存儲在HDFS中並實現備份,不免有些小題大作。若是運行map任務的節點在將map中間結果傳送給reduce任務以前失敗,Hadoop將在另外一個節點上從新運行這個map任務以再次構建map中間結果。
shuffle和排序在MapReduce流程圖中的執行過程
MapReduce確保每一個reducer的輸入都是按鍵排序的。系統執行排序、將map輸出做爲輸入傳給reduce的過程稱爲shuffle。在此,咱們將學習shuffle是如何工做的,由於它有助於咱們理解工做機制(若是須要優化MapReduce程序)。shuffle屬於不斷被優化和改進的代碼庫的一部分,所以下面的描述有必要隱藏一些細節。從許多方面來看,shuffle是MapReduce的「心臟」,是奇蹟發生的地方。
map端shuffle過程
1. 讀取HDFS上的輸入分片input split,每一行解析成一個<key, value>。每個鍵值對調用一次map函數。輸入<0,helloyou>,<10,hello me>。
2. 覆蓋map(),接收1中產生的<key, value>,而後進行處理,轉換爲新的<key, value>輸出。每一個map任務都有一個環形內存緩存區,輸出結果會暫且放在環形內存緩衝區中(該緩衝區的大小默認爲100MB,由mapreduce.task.io.sort.mb屬性控制),當該緩衝區快要溢出時(默認爲緩衝區大小的80%,由mapreduce.map.sort.spill.percent屬性控制),會由單獨線程在本地文件系統中建立一個臨時溢出文件(spill file),將該緩衝區中的數據寫入這個文件,但若是再此期間緩衝區被填滿,map會被堵塞直到寫入過程完成。溢出寫過程按輪詢方式將緩衝區中的內容寫到mapreduce.cluster.local.dir屬性在做業特定子目錄下指定的目錄中。輸出:<hello, 1>,<you, 1>,<hello, 1>,<me, 1>。
注:當緩衝區的數據值達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢出寫線程啓動,鎖定這80MB的內存,執行溢出寫過程。map任務的輸出結果還能夠往剩下的20MB內存中寫,互不影響。
3. 對2輸出的<key, value>進行分區,默認分爲一個區,MapReduce提供Partitioner接口,做用就是根據key或value及reduce的數量來決定當前的輸出數據最終應該交由哪一個reduce任務處理。默認對key hash後再以reduce任務數據取模。默認的取模方式只是爲了平均reduce的處理能力,若是用戶本身對Partitioner有需求,能夠定製並設置到job上。
在寫入磁盤以前,線程首先根據reduce任務的數目將數據劃分爲相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣作是爲了不有些reduce任務分配到大量數據,而有些reduce任務卻分到不多數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。接下來對不一樣分區中的數據進行排序(按照key),也就是對這80MB空間內的key作排序(sort),這裏的排序是對序列化的字節作的排序。若是此時設置了Combiner,將排序後的結果進行Combiner操做,若是至少存在3個溢出文件(經過mapreduce.map.combine.minspills屬性設置)時,則combiner就會在輸出文件寫到磁盤以前再次運行,這樣作的目的是讓儘量減小數據寫入到磁盤和傳遞給reduce的數據。排序後:<hello, 1>,<hello, 1>,<me, 1>,<you, 1>,Combiner後:<hello, {1, 1}>,<me, {1}>,<you, {1}>。
注:combiner能夠在輸入上反覆運行,但並不影響最終結果。若是隻有1或2個溢出文件,那麼因爲map輸出規模減小,於是不值得調用combiner帶來的開銷,所以不會爲該map輸出再次運行combiner。
4. 當map任務輸出最後一個記錄時,可能會有不少的溢出文件,這時須要將這些文件合併(merge)。合併的過程當中會不斷地進行排序和combiner操做,目的有兩個:
① 儘可能減小每次寫入磁盤的數據量;
② 儘可能減小下一複製階段網絡傳輸的數據量。
最後合併成了一個已分區且已排序的輸出文件。配置屬性mapreduce.task.io.sort.factor控制着一次最多能合併多少流,默認值是10。爲了減小網絡傳輸的數據量,節約磁盤空間和寫磁盤的速度更快,這裏能夠將數據壓縮,只要將mapreduce.map.output.compress設置爲true就能夠。數據壓縮算法有DEFLATE、gzip、bzip二、LZO、LZ四、Snappy等,能夠經過mapreduce.map.output.compress.codec配置壓縮類型便可。
5. 將分區中的數據拷貝給相對應的reduce任務(可選)。reducer經過HTTP獲得輸出文件的分區。用於文件分區的工做線程的數量由任務的mapreduce.shuffle.max.threads屬性控制,此設置針對的是每個節點管理器,而不是針對每一個map任務。默認值0將最大線程數設置爲機器中處理器數量的兩倍。
有人可能會問:分區中的數據怎麼知道它對應的reduce是哪一個呢?其實map任務一直和其節點上的Application Master保持聯繫,而Application Master又一直和Application Manager保持心跳。因此Application Manager中保存了整個集羣中的宏觀信息。只要reduce任務向ApplicationManager獲取對應的map輸出位置就OK了。
至此,map端的全部工做已經結束了,最終生成的這個文件也存放在運行map任務的tasktracker的本地磁盤上(但reduce輸出並不這樣)。每一個reduce任務不斷地經過RPC從JobTracker那獲取map任務是否完成的信息,若是reduce任務獲得通知,獲知某臺TaskTracker上的map任務執行完成,shuffle的後半段過程開始啓動。
如今,tasktracker須要爲分區文件運行reduce任務。下圖是reduce端shuffle過程圖:
reduce端shuffle過程圖
1. copy過程,簡單地拉取數據。reduce任務須要集羣上若干個map任務的map輸出做爲其特殊的分區文件。每一個map任務的完成時間可能不一樣,所以在每一個任務完成時,reduce任務就開始經過HTTP方式請求複製其輸出。 reduce任務由少許複製線程,所以可以並行取得map輸出。默認值是5個線程,但這個默認值能夠修改設置mapreduce.reduce.shuffle.parallelcopies屬性便可。
若是map輸出至關小,會被複制到reduce任務JVM的內存(緩衝區大小由mapreduce.reduce.shuffle.input.buffer.percent屬性控制,指定用於此用途的堆空間的百分比),不然,map輸出被複制到磁盤。一旦內存緩衝區達到閾值大小(由mapreduce.reduce.shuffle.merge.percent決定,默認是0.66)或達到map輸出閾值(由mapreduce.reduce.merge.inmem.threshold控制),則合併後溢出寫到磁盤中。若是指定combiner,則在合併期間運行它以下降寫入硬盤的數據量。
隨着磁盤上副本增多,後臺線程會將它們合併爲更大的、排好序的文件。這會爲後面的合併節省一些時間。
2. merge階段。從map端copy過來的數據會先放入JVM的內存緩衝區中,這裏的緩衝區大小要比map端更爲靈活,它基於JVM的heap size設置的,由於shuffle階段reducer不運行,因此絕大部分的內存都給shuffle使用。這個merge階段將合併map輸出,維持其順序排序。這是循環進行的。好比,若是由50個map輸出,而合併因子是10(10爲默認設置,由mapreduce.task.io.sort.factor屬性設置,與map的合併相似),合併將進行5趟。每趟將10個文件合併成一個文件,所以最後有5箇中間文件。
Merge有三種形式:一、內存到內存;二、內存到磁盤;三、磁盤到磁盤。默認狀況下第一種形式是不啓動的。當內存中的數據量到達必定閾值,就啓動內存到磁盤的merge。與map端相似,這也是溢寫的過程,在這個過程當中若是設置了combiner,也是會啓動的,而後在磁盤中合併溢寫文件。第二種merge方式一直再運行,直到沒有map端的數據時才結束,而後啓動第三種磁盤到磁盤的merge方式生成最終的輸出文件。
3. reduce階段。這是最後階段了,直接把數據輸入reduce函數,也就是對已排序輸出中的每一個鍵調用reduce函數,從而省略了一次磁盤往返行程,並無將這5個文件合併成一個已排序的文件做爲最後一趟。最後的合併能夠來自內存和磁盤片斷。此階段的輸出直接寫到輸出文件系統,通常爲HDFS。若是採用HDFS,因爲節點管理器(NodeManager)也運行數據節點(DataNode),因此第一個塊複本將被寫到本地磁盤。
reduce任務並不具有數據本地化的優點,單個reduce任務的輸入一般來自於全部map任務的輸出,或者接收到不一樣map任務的輸出。在本例中,咱們假設僅有一個reduce任務,其輸入是全部map任務的輸出。所以,排過序的map輸出需經過網絡傳輸發送到運行reduce任務的節點。數據在reduce端合併,而後由用戶定義的reduce函數處理。reduce的輸出一般存儲在HDFS中以實現可靠存儲。對於reduce輸出的每一個HDFS塊,第一個複本存儲在本地節點上,其餘複本出於可靠性考慮存儲在其餘機架的節點中。所以,將reduce的輸出寫入HDFS確實須要佔用網絡帶寬,但這與正常的HDFS管線寫入的消耗同樣。
一個reduce任務的完整數據流如圖所示。虛線框表示節點,虛線箭頭表示節點內部的數據傳輸,而實線箭頭表示不一樣節點之間的數據傳輸。
一個reduce任務的MapReduce數據流圖
reduce任務的數量並不是由輸入數據的大小決定,相反是獨立指定的。
若是有好多個reduce任務,每一個map任務就會針對輸出進行分區(partition),即爲每一個reduce任務建一個分區。每一個分區有許多鍵(及其對應的值),但每一個鍵對應的鍵-值對記錄都在同一個分區中。分區可由用戶定義的分區函數控制,但一般用默認的partitioner經過哈希函數來分區,很高效。
通常狀況下,多個reduce任務的數據流以下圖所示。該圖很清晰地代表了爲何map任務和reduce任務之間的數據流稱爲shuffle(混洗),由於每一個reduce任務的輸入都來自許多map任務。shuffle通常比圖中所示的更復雜(上下節已描述了大概),並且調整混洗參數對做業總執行時間的影響很是打。
多個reduce任務的數據流圖
最後,當數據處理能夠徹底並行(即無需混洗時),可能會出現無reduce任務的狀況。在這種狀況下,惟一的非本地節點數據傳輸是map任務將結果寫入HDFS,參見下圖所示。
無reduce任務的MapReduce數據流
Map的輸出結果是由Collector處理的,每一個Map任務不斷地將鍵值對輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是爲了更有效地使用內存空間,在內存中放置儘量多的數據。
這個數據結構其實就是個字節數組,叫Kvbuffer,名如其義,可是這裏面不光放置了數據,還放置了一些索引數據,給放置索引數據的區域起了一個Kvmeta的別名,在Kvbuffer的一塊區域上穿了一個IntBuffer(字節序採用的是平臺自身的字節序)的馬甲。數據區域和索引數據區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來劃分二者,分界點不是亙古不變的,而是每次Spill以後都會更新一次。初始的分界點是0,數據的存儲方向是向上增加,索引數據的存儲方向是向下增加,Kvbuffer的存放指針bufindex時指向數據區的,是一直悶着頭地向上增加,好比bufindex初始值爲0,一個Int型的key寫完以後,bufindex增加爲4,一個Int型的value寫完以後,bufindex增加爲8。
1 kvoffsets緩衝區:也叫偏移量索引數組,用於保存key/value信息在位置索引kvindices中的偏移量。當kvoffsets的使用率超過io.sort.spill.percent(默認爲80%)後,便會觸發一次SpillThread線程的「溢寫」操做,也就是開始一次spill階段的操做。
索引數據區域:存元數據信息,都是整數,只存儲分區信息(整數)和kvbuffer在數組中的位置
2 kvindices緩衝區:也叫位置索引數組,用於保存key/value在數據緩衝區kvbuffer中的起始位置。
3 kvbuffer數據緩衝區:用於保存實際的key/value的值。默認狀況下該緩衝區最多可使用io.sort.mb的95%,當kvbuffer使用率超過io.sort.spill.percent(默認80%)後,便會觸發一次SpillThread線程的「溢寫」操做,也就是開始一次spill階段的操做。
整個過程描述以下圖所示。在最高層,有如下5個獨立的實體。
Hadoop運行MapReduce做業的工做原理圖
1. 客戶端提交一個MapReduce做業,Job的submit()方法建立一個內部的JobSummiter實例,而且調用其submitJobInternal()方法。提交做業後,waitForCompletion()每秒輪詢做業的進度,若是發現自上次報告後有改變,便把進度報告到控制檯。做業完成後,若是成功,就顯示做業計數器;若是失敗,則致使做業失敗的錯誤被記錄到控制檯。
2. Job向資源管理器請求一個新應用ID,用於MapReduce做業ID。資源管理器檢查做業的輸出說明和計算做業的輸入分片,若是沒有指定輸出目錄,輸出目錄已存在或者分片沒法計算,那麼做業就不提交,錯誤拋回給MapReduce程序。
3. 將運行做業所須要的資源(包括做業JAR文件、配置文件和計算所得的輸入分片)複製到一個以做業ID命名的目錄下的共享文件系統中。做業JAR的複本較多(由mapreduce.client.submit.file.replication屬性控制,默認值爲10),所以在運行做業的任務時,集羣中有不少個複本可供節點管理器訪問。
4. 經過調用資源管理器的submitApplication()方法提交做業。
5. 資源管理器收到調用它的submitApplication()消息後,便將請求傳遞給YARN調度器(scheduler)。調度器分配一個容器,而後資源管理器在節點管理器的管理下在容器中啓動application master的進程。
6. MapReduce做業的application master是一個Java應用程序,它的主類是MRAppMaster。因爲將接受來自任務的進度和完成報告,所以application master對做業的初始化是經過建立多個薄記對象以保持對做業進度的跟蹤來完成的。
7. 對每個分片建立一個map任務對象以及由mapreduce.job.reduces屬性(經過做業的setNumReduceTasks()方法設置)肯定的多個reduce任務對象。任務ID在此時分配。
application master必須決定如何運行構成MapReduce做業的各個任務。若是做業很小,就選擇和本身在同一個JVM上運行任務。與在一個節點上順序運行這些任務相比,當application master判斷在新的容器中分配和運行任務的開銷大於並行運行它們的開銷時,就會發生這種狀況。這樣的做業稱爲uberized,或者uber任務(小做業)運行。
默認狀況下,小做業就是少於10個mapper且只有1個reducer且輸入大小小於一個HDFS塊的做業(經過設置mapreduce.job.ubertask.maxmaps、mapreduce.job.ubertask.maxreduces和mapreduce.job.ubertask.maxbytes能夠改變這幾個值)。必須明確啓動uber任務(對於單個做業,或者是對整個集羣),具體方法是將mapreduce.job.ubertask.enable設置爲true。
最後,在任何任務運行以前,application master調用setupJob()方法設置OutputCommitter。FileOutputCommitter爲默認值,表示將創建做業的最終輸出目錄及任務輸出的臨時工做空間。
8. 若是做業不適合做爲uber任務運行,那麼application master就會爲該做業中的全部map任務和reduce任務向資源管理器請求容器。首先爲Map任務發出請求,該請求優先級要高於reduce任務的請求,這是由於全部的map任務必須在reduce的排序階段可以啓動前完成。直到有5%的map任務已經完成時,爲reduce任務的請求才會發出(慢啓動reduce)。
reduce任務可以在集羣中任意位置運行,但map任務的請求有着數據本地化侷限,這也是調度器所關注的。map任務的三種狀況的詳見1、概念綜述中的map。
請求也爲任務指定了內存需求和CPU數。在默認狀況下,每一個map任務和reduce任務都分配到1024MB的內存和一個虛擬的內核,這些值能夠在每一個做業的基礎上進行配置,配置參考以下表:
屬性名稱 | 類型 | 默認值 | 說明 |
mapreduce.map.memory.mb | int | 1024 | map容器所用的內存容量 |
mapreduce.reduce.memory.mb | int | 1024 | reduce容器所用的內存容量 |
mapreduce.map.cpu.vcores | int | 1 | map容器所用的虛擬內核 |
mapreduce.reduce.cpu.vcoresp.memory.mb | int | 1 | reduce容器所用的虛擬內核 |
9. 一旦資源管理器的調度器爲任務分配了一個特定節點上的容器,application master就經過與節點管理器通訊來啓動容器。該任務由主類爲YarnChild的一個Java應用程序執行。
10. 在它運行任務以前,首先將任務須要的資源本地化,包括做業的配置、JAR文件和全部來自分佈式緩存的文件。
11. 最後,運行map任務或reduce任務。
參考資料:《Hadoop權威指南(第四版)》
https://www.jianshu.com/p/1e542477b59a