MapReduce的一點理解

對於MapReduce編程,大機率的流程用過的人或多或少都清楚,可是歸結到細節上,就有的地方不清楚了,下面根據本身的疑問,加上從網上各處,找到的被人的描述,最本身的疑問作出回答。算法

1. MapReduce 和 HDFS有什麼關係?

  首先,HDFS和MapReduce是Hadoop最核心的設計;編程

  對於HDFS,即Hadoop Distributed File System,它是Hadoop的存儲基礎,是數據層面的,提供海量的數據存儲;而MapReduce,則是一種引擎或者編程模型,能夠理解爲數據的上一層,咱們能夠經過編寫MapReduce程序,對海量的數據進行計算處理。這就相似於咱們經過 檢索(MapReduce)全部文件(HDFS),找到咱們想要的結果。數組

  其次,MapReduce中JobTracker和TaskTracker分別對應於HDFS中的NameNode和DataNode網絡

 

2.  MapReduce處理中,數據的流程是什麼?

在客戶端、JobTracker、TaskTracker的層次來分析MapReduce的工做原理的:  app

  a. 在客戶端啓動一個做業。函數

  b. 向JobTracker請求一個Job ID。oop

  c. 將運行做業所須要的資源文件複製到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入劃分信息。這些文件都存放在JobTracker專門爲該做業建立的文件夾中。文件夾名爲該做業的Job ID。JAR文件默認會有10個副本(mapred.submit.replication屬性控制);輸入劃分信息告訴了JobTracker應該爲這個做業啓動多少個map任務等信息。線程

  d. JobTracker接收到做業後,將其放在一個做業隊列裏,等待做業調度器對其進行調度(這裏是否是很像微機中的進程調度呢,呵呵),看成業調度器根據本身的調度算法調度到該做業時,會根據輸入劃分信息爲每一個劃分建立一個map任務,並將map任務分配給TaskTracker執行。對於map和reduce任務,TaskTracker根據主機核的數量和內存的大小有固定數量的map槽和reduce槽。這裏須要強調的是:map任務不是隨隨便便地分配給某個TaskTracker的,這裏有個概念叫:數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同時將程序JAR包複製到該TaskTracker上來運行,這叫「運算移動,數據不移動」。而分配reduce任務時並不考慮數據本地化。設計

  e. TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶着不少的信息,好比當前map任務完成的進度等信息。當JobTracker收到做業的最後一個任務完成信息時,便把該做業設置成「成功」。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。blog

     

若是具體從map端和reduce端分析,能夠參考上面的圖片,具體以下:

Map端: 

  a. 每一個輸入分片會讓一個map任務來處理,默認狀況下,以HDFS的一個塊的大小(默認爲64M)爲一個分片,固然咱們也能夠設置塊的大小。map輸出的結果會暫且放在一個環形內存緩衝區中(該緩衝區的大小默認爲100M,由io.sort.mb屬性控制),當該緩衝區快要溢出時(默認爲緩衝區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中建立一個溢出文件,將該緩衝區中的數據寫入這個文件。
  b. 在寫入磁盤以前,線程首先根據reduce任務的數目將數據劃分爲相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣作是爲了不有些reduce任務分配到大量數據,而有些reduce任務卻分到不多數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。而後對每一個分區中的數據進行排序,若是此時設置了Combiner,將排序後的結果進行Combia操做,這樣作的目的是讓儘量少的數據寫入到磁盤。
  c. 當map任務輸出最後一個記錄時,可能會有不少的溢出文件,這時須要將這些文件合併。合併的過程當中會不斷地進行排序和combia操做,目的有兩個:1.儘可能減小每次寫入磁盤的數據量;2.儘可能減小下一複製階段網絡傳輸的數據量。最後合併成了一個已分區且已排序的文件。爲了減小網絡傳輸的數據量,這裏能夠將數據壓縮,只要將mapred.compress.map.out設置爲true就能夠了。
  d. 將分區中的數據拷貝給相對應的reduce任務。有人可能會問:分區中的數據怎麼知道它對應的reduce是哪一個呢?其實map任務一直和其父TaskTracker保持聯繫,而TaskTracker又一直和JobTracker保持心跳。因此JobTracker中保存了整個集羣中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就ok了哦。

  到這裏,map端就分析完了。那到底什麼是Shuffle呢?Shuffle的中文意思是「洗牌」,其實Shuffle也是一個很複雜的過程,這裏能夠參照下面的第三個問題。

Reduce端: 

  a.Reduce會接收到不一樣map任務傳來的數據,而且每一個map傳來的數據都是有序的。若是reduce端接受的數據量至關小,則直接存儲在內存中(緩衝區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用做此用途的堆空間的百分比),若是數據量超過了該緩衝區大小的必定比例(由mapred.job.shuffle.merge.percent決定),則對數據合併後溢寫到磁盤中。
  b.隨着溢寫文件的增多,後臺線程會將它們合併成一個更大的有序的文件,這樣作是爲了給後面的合併節省時間。其實無論在map端仍是reduce端,MapReduce都是反覆地執行排序,合併操做,如今終於明白了有些人爲何會說:排序是hadoop的靈魂。
  c.合併的過程當中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據儘量地少,而且最後一次合併的結果並無寫入磁盤,而是直接輸入到reduce函數。

 

3.   Map處理數據後,到Reduce獲得數據以前,數據的流程是什麼?

   其實,將map處理的結果,傳輸到reduce上的過程,在MapReduce中,能夠看作shuffle的過程。

  在map端,每一個map task都有一個內存緩衝區,存儲着map的輸出結果,當緩衝區快滿的時候須要將緩衝區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束後再對磁盤中這個map task產生的全部臨時文件作合併,生成最終的正式輸出文件,而後等待reduce task來拉數據。 

  a.  在map task執行時,它的輸入數據來源於HDFS的block,固然在MapReduce概念中,map task只讀取split。Split與block的對應關係多是多對一,默認是一對一。

    b. 在通過mapper的運行後,咱們得知mapper的輸出是這樣一個key/value對。到底當前的key應該交由哪一個reduce去作呢,是須要如今決定的。 MapReduce提供Partitioner接口,它的做用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪一個reduce task處理。默認對key hash後再以reduce task數量取模。默認的取模方式只是爲了平均reduce的處理能力,若是用戶本身對Partitioner有需求,能夠訂製並設置到job上。接下來,須要將數據寫入內存緩衝區中,緩衝區的做用是批量收集map結果,減小磁盤IO的影響。咱們的key/value對以及Partition的結果都會被寫入緩衝區。固然寫入以前,key與value值都會被序列化成字節數組。

  c.  這個內存緩衝區是有大小限制的,默認是100MB。當map task的輸出結果不少時,就可能會撐爆內存,因此須要在必定條件下將緩衝區中的數據臨時寫入磁盤,而後從新利用這塊緩衝區。這個從內存往磁盤寫數據的過程被稱爲Spill,中文可譯爲溢寫,字面意思很直觀。這個溢寫是由單獨線程來完成,不影響往緩衝區寫map結果的線程。溢寫線程啓動時不該該阻止map的結果輸出,因此整個緩衝區有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩衝區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啓動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還能夠往剩下的20MB內存中寫,互不影響。 
        當溢寫線程啓動後,須要對這80MB空間內的key作排序(Sort)。排序是MapReduce模型默認的行爲,這裏的排序也是對序列化的字節作的排序。 存緩衝區沒有對將發送到相同reduce端的數據作合併,那麼這種合併應該是體現是磁盤文件中的。從官方圖上也能夠看到寫到磁盤中的溢寫文件是對不一樣的reduce端的數值作過合併。因此溢寫過程一個很重要的細節在於,若是有不少個key/value對須要發送到某個reduce端去,那麼須要將這些key/value值拼接到一塊,減小與partition相關的索引記錄。 

    在map端的過程,可參考下圖: 

       

至此,map端的全部工做都已結束,最終生成的這個文件也存放在TaskTracker夠得着的某個本地目錄內。每一個reduce task不斷地經過RPC從JobTracker那裏獲取map task是否完成的信息,若是reduce task獲得通知,獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程開始啓動。 

        簡單地說,reduce task在執行以前的工做就是不斷地拉取當前job裏每一個map task的最終結果,而後對從不一樣地方拉取過來的數據不斷地作merge,也最終造成一個文件做爲reduce task的輸入文件。見下圖:

                 

   a. Copy過程,簡單地拉取數據。Reduce進程啓動一些數據copy線程(Fetcher),經過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。由於map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中。 
   b. Merge階段。這裏的merge如map端的merge動做,只是數組中存放的是不一樣map端copy來的數值。Copy過來的數據會先放入內存緩衝區中,這裏的緩衝區大小要比map端的更爲靈活,它基於JVM的heap size設置,由於Shuffle階段Reducer不運行,因此應該把絕大部分的內存都給Shuffle用。這裏須要強調的是,merge有三種形式:1)內存到內存  2)內存到磁盤  3)磁盤到磁盤。默認狀況下第一種形式不啓用,讓人比較困惑,是吧。當內存中的數據量到達必定閾值,就啓動內存到磁盤的merge。與map 端相似,這也是溢寫的過程,這個過程當中若是你設置有Combiner,也是會啓用的,而後在磁盤中生成了衆多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,而後啓動第三種磁盤到磁盤的merge方式生成最終的那個文件。 
     c. Reducer的輸入文件。不斷地merge後,最後會生成一個「最終文件」。爲何加引號?由於這個文件可能存在於磁盤上,也可能存在於內存中。對咱們來講,固然但願它存放於內存中,直接做爲Reducer的輸入,但默認狀況下,這個文件是存放於磁盤中的。當Reducer的輸入文件已定,整個Shuffle才最終結束。而後就是Reducer執行,把結果放到HDFS上。

    

上面的文章,部分是本身寫的,一部分摘自別人通俗易懂的理解,具體可參考:

  http://weixiaolu.iteye.com/blog/1474172

  http://langyu.iteye.com/blog/992916

相關文章
相關標籤/搜索