hadoop學習筆記(十):MapReduce工做原理(重點)

1、MapReduce完整運行流程

 

解析:

1 在客戶端啓動一個做業。算法

2 向JobTracker請求一個Job ID數組

3 將運行做業所須要的資源文件複製到HDFS上,包括MapReduce程序打包的jar文件、配置文件和客戶端計算所得的計算劃分信息。這些文件都存放在JobTracker專門爲該做業建立的文件夾中。文件夾名爲該做業的Job ID。jar文件默認會有10個副本(mapred.submit.replication屬性控制);輸入劃分信息告訴了JobTracker應該爲這個做業啓動多少個map任務等信息。網絡

4 JobTracker接收到做業後,將其放在一個做業隊列裏,等待做業調度器對其進行調度(這裏是否是很像微機中的進程調度呢),看成業調度器根據本身的調度算法調度到該做業時,會根據輸入劃分信息爲每一個劃分建立一個map任務,並將map任務分配給TaskTracker執行。對於map和reduce任務,TaskTracker根據主機核的數量和內存的大小有固定數量的map槽和reduce槽這裏需強調的是map任務不是隨隨便便地分配給某個TaskTracker的,這裏有個概念叫:數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同事將程序jar包複製到該TaskTracker上來運行,這叫「運算移動,數據不移動」。而分配reduce任務時並不考慮數據本地化。app

5 TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶者不少信息,好比當前map任務完成的進度等信息。當JobTracker收到做業的最後一個任務完成信息時,便把該做業設置成「成功」。當JobTracker查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。函數

2、MapReduce任務的Shuffle和排序過程

 Map端流程分析

1 每一個輸入分片會讓一個map任務來處理,默認狀況下,以HDFS的一個塊的大小(默認64M)爲一個分片,固然咱們也能夠設置塊的大小。map輸出的結果會暫且放在一個環形內存緩衝區中(該緩衝區的大小默認爲100M,由io.sort.mb屬性控制),當該緩衝區快要溢出時(默認爲緩衝區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中建立一個溢出文件,將該緩衝區中的數據寫入這個文件。oop

2 在寫入磁盤以前,線程首先根據reduce任務的數目將數據劃分爲相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣作是爲了不有些reduce任務分配到大量數據,而有些reduce任務卻分到不多數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。而後對每一個分區中的數據進行排序,若是此時設置了Combiner,將排序後的結果進行Combianer操做,這樣作的目的是讓儘量少的數據寫入到磁盤。性能

3 當map任務輸出最後一個記錄時,可能會有不少的溢出文件,這時須要將這些文件合併。合併的過程當中會不斷地進行排序和combiner操做,目的有兩個:一、儘可能減小每次寫入磁盤的數據量;二、儘可能減小下一複製階段網絡傳輸的數據量。最後合併成了一個已分區且已排序的文件。爲了減小網絡傳輸的數據量,這裏能夠將數據壓縮,只要將mapred.compress.map.out設置爲true就能夠。優化

數據壓縮:Gzip、Lzo、snappy。spa

4 將分區中的數據拷貝給相對應的reduce任務。有人可能會問:分區中的數據怎麼知道它對應的reduce是哪一個呢?其實map任務一直和其父TaskTracker保持聯繫,而TaskTracker又一直和obTracker保持心跳。因此JobTracker中保存了整個集羣中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就OK了。線程

Shuffle分析

Shuffle的中文意思是「洗牌」,若是咱們這樣看:一個map產生的數據,結果經過hash過程分區缺分配給了不一樣的reduce任務,是否是一個對數據洗牌的過程呢?

 shuffle的概念:

  Collections.shuffle(List list):隨機打亂list裏的元素順序。

  MapReduce裏的Shuffle:描述着數據從map task輸出reduce task輸入的這段過程。

Map端shuffle的過程:

1 每一個map task都有一個內存緩衝區,存儲着map的輸出結果,當緩衝區快滿的時候須要將緩衝區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束後在對磁盤中這個map task產生的全部臨時文件作一個合併,生成最終的正式輸出文件,而後等待reduce task來拉數據。

2 在map task執行時,它的輸入數據來源於HDFS的block,固然在MapReduce概念中,map task只讀取split。split與block對應關係多是多對一,默認是一對一。在wordcount例子裏,假設map的輸入數據都是是像「aaa」這樣的字符串。

3 在通過mapper的運行後,咱們得知mapper的輸出是這樣一個key/value對:key是「aaa」,value是數值1。由於當前map端只作加1的操做,在reduce task裏採起合併結果集。前面咱們知道這個job有3個reduce task。那到底當前的「aaa」究竟該丟給哪一個reduce去處理呢?是須要如今作決定的。

4 MapReduce提供Partitioner接口,做用就是根據key或value及reduce的數量來決定當前的輸出數據最終應該交由哪一個reduce task處理。默認對key hash後再以reduce task數據取模。默認的取模方式只是爲了平均reduce的處理能力,若是用戶本身對Partitioner有需求,能夠定製並設置到job上。

5 在例子中,「aaa」通過Partition後返回0,也就是這對值應當交由第一個reduce來處理。接下來,須要將數據寫入內存緩衝區中,緩衝區的做用是批量收集map結果減小磁盤IO的影響。咱們的key/value對以及Partition的結果都會被寫入緩衝區。固然,寫入以前,key與value值都會被序列化成字節數組。

6 內存緩衝區是有大小限制的,默認是100MB。當map task的輸出結果不少時,就可能會撐爆內存,因此須要在必定條件下將緩衝區中的數據臨時寫入磁盤,而後從新利用這塊緩衝區。這個從內存往磁盤寫數據的過程被稱爲spill,中文可理解爲溢寫。溢寫是由單獨線程來完成,不影響往緩衝區寫map結果的線程。溢寫線程啓動時不該該阻止map的結果輸出,因此整個緩衝區有個溢寫的比例spill.percent。比例默認是0.8,也就是當緩衝區的數據值已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啓動,鎖定這80MB的內存,執行溢寫過程。map task的輸出結果還能夠往剩下的20MB內存中寫,互不影響。

7 當溢寫線程啓動後,須要對這80MB空間內的key作排序(sort)。排序是MapReduce模型默認的行爲,這裏的排序也是對序列化的字節作的排序

8 由於map task的輸出是須要發送到不一樣的reduce端去,而內存緩衝區沒有對將發送到相同reduce端的數據作合併,那麼這種合併應該是體如今磁盤文件中的。從官方圖上也能夠看到寫到磁盤中的一些文件是對不一樣的reduce端的數值作過合併。因此溢寫過程一個很重要的細節在於,若是有不少個key/value對須要發送到某個reduce端去,那麼須要將這些key/value值拼接到一塊,減小與partition相關的索引記錄。

  在針對每一個reduce端而合併數據時,有些數據可能像這樣:「aaa」/1,「aaa」/1。對於wordcount例子,只是簡單地統計單詞出現的次數,若是在同一個map task的結果中有不少像「aaa」同樣出現屢次的key,咱們就應該把它們的值合併到一塊,這個過程叫reduce也叫combine。但MapReduce的術語中,reduce只值reduce端執行從多個map task取數據作計算的過程。除reduce外,非正式地合併數據只能算做combine了。其實你們知道的,MapReduce中將Combiner等同於Reducer。

  若是client設置過Combiner,那麼如今就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減小溢寫到磁盤的數據量。Combiner會優化MapReduce的中間結果,因此它在整個模型中會屢次使用。那哪些場景才能使用Combiner呢?從這裏分析,Combiner的輸出是Reducer的輸入,Combiner毫不能改變最終的計算結果。因此從個人想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value類型徹底一致,且不影響最終結果的場景。好比累加,最大值等。Combiner的使用必定得慎重,若是用好,它對job執行效率有幫助,反之會影響reduce的最終結果。

9 每次溢寫會在磁盤上生成一個溢寫文件,若是map的輸出結果然的很大,有屢次這樣的溢寫發生,磁盤上相應的就會有多個溢寫文件存在。當map task真正完成時,內存緩衝區中的數據也所有溢寫到磁盤中造成一個溢寫文件。最終磁盤中會至少有一個這樣的溢寫文件存在(若是map的輸出結果不多,當map執行完成時,只會產生一個溢寫文件),由於最終的文件只有一個,因此須要將這些溢寫文件歸併到一塊兒,這個過程就叫Merge。Merge是怎樣的?如前面的例子,「aaa」從某個map task讀取過來時值是5,從另一個map讀取時值是8,由於他們有相同的key,因此要merge成group

  什麼是group:對於「aaa」就是像真陽的:{「aaa」,[5,8,2,...]},數組中的值就是從不一樣的溢寫文件中讀取出來的,而後再把這些值加起來。請注意,由於merge是將多個溢寫文件合併到一個文件,因此可能也有相同的key存在,在這個過程當中,若是client設置過Combiner,也會使用Combiner來合併相同的key。

至此,map端的全部工做都已經結束,最終生成的這個文件也存放在TaskTracker夠獲得的某個本地目錄中。每一個reduce task不斷地經過RPC從JobTRacker那獲取map task是否完成的信息,若是reduce task獲得通知,獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程開始啓動

Reduce端的shuffle過程:

 

1 copy過程,簡單地拉取數據。Reduce進程啓動一些數據copy線程(Fetcher),經過http方式請求map task所在的TaskTracker獲取map task的輸出文件。由於map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中

2 Merge階段。這裏的merge和map端的merge動做相同,只是數組中存放的是不一樣map端copy來的數值。copy過來的數據會先放入內存緩衝區中,這裏的緩衝區大小要比map端更爲靈活,它基於JVM的heap size設置,由於Shuffle階段Reducer不運行,因此應該把絕大部分的內存都給Shuffle使用

3 Merge有三種形式:一、內存到內存;二、內存到磁盤;三、磁盤到磁盤。默認狀況下第一種形式不啓用,讓人比較困惑。當內存中的數據量到達必定閾值,就啓動內存到磁盤的merge。與map端相似,這也是溢寫的過程,在這個過程當中若是你設置有Combiner,也是會啓用的,而後在磁盤中生成了衆多溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,而後啓動第三種磁盤到磁盤的merge方式生成最終的那個文件。

reduce端流程分析

1 reduce會接收到不一樣map任務傳來的數據,而且每一個map傳來的數據都是有序的。若是reduce端接收的數據量至關小,則直接存儲在內存中(緩衝區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用做此用途的堆空間百分比),若是數據量超過了該緩衝區大小的必定比例(由mapred.job.shuffle.merg.percent決定),則對數據合併後溢寫到磁盤中。

2 隨着溢寫文件的增多,後臺線程會將它們合併成一個更大的有序的文件,這樣作是爲了給後面的合併節省空間。其實無論在map端仍是在reduce端,MapReduce都是反覆地執行排序,合併操做,如今終於明白了有些人爲何會說:排序是hadoop的靈魂

3 合併的過程當中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據儘量地少,而且最後一次合併的結果並無寫入磁盤,而是直接輸入到reduce函數。

4 Reducer的輸入文件。不斷地merge後,最後會生成一個「最終文件」。爲何加引號?由於這個文件可能存在於磁盤上,也可能存在於內存中。對咱們來講,但願它存放於內存中,直接做爲Reducer的輸入,但默認狀況下,這個文件是存放於磁盤中的。當Reducer的輸入文件已定,整個Shuffle才最終結束。而後就是Reducer執行,把結果放到HDSF上。

注意:對MapReduce的調優在很大程度上就是對MapReduce Shuffle的性能的調優

3、內存緩衝區:MapOutputBuffer

兩級索引結構:

環形緩衝區:

1 kvoffsets緩衝區:也叫偏移量索引數組,用於保存key/value信息在位置索引kvindices中的偏移量。當kvoffsets的使用率超過io.sort.spill.percent(默認爲80%)後,便會觸發一次SpillThread線程的「溢寫」操做,也就是開始一次spill階段的操做。

2 kvindices緩衝區:也叫位置索引數組,用於保存key/value在數據緩衝區kvbuffer中的起始位置

3 kvbuffer數據緩衝區:用於保存實際的key/value的值。默認狀況下該緩衝區最多可使用io.sort.mb的95%,當kvbuffer使用率超過io.sort.spill.percent(默認80%)後,便會觸發一次SpillThread線程的「溢寫」操做,也就是開始一次spill階段的操做。

相關文章
相關標籤/搜索