MapReduce與Yarn 的詳細工做流程分析

MapReduce詳細工做流程之Map階段

MR一

如上圖所示服務器

  1. 首先有一個200M的待處理文件
  2. 切片:在客戶端提交以前,根據參數配置,進行任務規劃,將文件按128M每塊進行切片
  3. 提交:提交能夠提交到本地工做環境或者Yarn工做環境,本地只須要提交切片信息和xml配置文件,Yarn環境還須要提交jar包;本地環境通常只做爲測試用
  4. 提交時會將每一個任務封裝爲一個job交給Yarn來處理(詳細見後邊的Yarn工做流程介紹),計算出MapTask數量(等於切片數量),每一個MapTask並行執行
  5. MapTask中執行Mapper的map方法,此方法須要k和v做爲輸入參數,因此會首先獲取kv值;
    • 首先調用InputFormat方法,默認爲TextInputFormat方法,在此方法調用createRecoderReader方法,將每一個塊文件封裝爲k,v鍵值對,傳遞給map方法
  6. map方法首先進行一系列的邏輯操做,執行完成後最後進行寫操做
  7. map方法若是直接寫給reduce的話,至關於直接操做磁盤,太多的IO操做,使得效率過低,因此在map和reduce中間還有一個shuffle操做
    • map處理完成相關的邏輯操做以後,首先經過outputCollector向環形緩衝區寫入數據,環形緩衝區主要兩部分,一部分寫入文件的元數據信息,另外一部分寫入文件的真實內容
    • 環形緩衝區的默認大小是100M,當緩衝的容量達到默認大小的80%時,進行反向溢寫
  8. 在溢寫以前會將緩衝區的數據按照指定的分區規則進行分區和排序,之因此反向溢寫是由於這樣就能夠邊接收數據邊往磁盤溢寫數據
  9. 在分區和排序以後,溢寫到磁盤,可能發生屢次溢寫,溢寫到多個文件
  10. 對全部溢寫到磁盤的文件進行歸併排序
  11. 在9到10步之間還能夠有一個Combine合併操做,意義是對每一個MapTask的輸出進行局部彙總,以減小網絡傳輸量
    • Map階段的進程數比Reduce階段要多,因此放在Map階段處理效率更高
    • Map階段合併以後,傳遞給Reduce的數據就會少不少
    • 可是Combiner可以應用的前提是不能影響最終的業務邏輯,並且Combiner的輸出kv要和Reduce的輸入kv類型對應起來

      整個MapTask分爲Read階段,Map階段,Collect階段,溢寫(spill)階段和combine階段網絡

      - Read階段:MapTask經過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/valueapp

      - Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,併產生一系列新的key/value函數

      - Collect收集階段:在用戶編寫map()函數中,當數據處理完成後,通常會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩衝區中oop

      - Spill階段:即「溢寫」,當環形緩衝區滿後,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。須要注意的是,將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操做測試

MapReduce詳細工做流程之Reduce階段

MR二

如上圖所示線程

  1. 全部的MapTask任務完成後,啓動相應數量的ReduceTask(和分區數量相同),並告知ReduceTask處理數據的範圍
  2. ReduceTask會將MapTask處理完的數據拷貝一份到磁盤中,併合並文件和歸併排序
  3. 最後將數據傳給reduce進行處理,一次讀取一組數據
  4. 最後經過OutputFormat輸出

整個ReduceTask分爲Copy階段,Merge階段,Sort階段(Merge和Sort能夠合併爲一個),Reduce階段。code

- Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,若是其大小超過必定閾值,則寫到磁盤上,不然直接放到內存中orm

- Merge階段:在遠程拷貝數據的同時,ReduceTask啓動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多cdn

- Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行彙集的一組數據。爲了將key相同的數據聚在一塊兒,Hadoop採用了基於排序的策略。因爲各個MapTask已經實現對本身的處理結果進行了局部排序,所以,ReduceTask只需對全部數據進行一次歸併排序便可

- Reduce階段:reduce()函數將計算結果寫到HDFS上

Shuffle機制

Map方法以後,Reduce方法以前的數據處理過程稱之爲Shuffle。shuffle流程詳解以下:

  1. MapTask收集map()方法輸出的kv對,放到環形緩衝區中
  2. 從環形緩衝區不斷溢出到本地磁盤文件,可能會溢出多個文件
  3. 多個溢出文件會被合併成大的溢出文件
  4. 在溢出過程及合併的過程當中,都要調用Partitioner進行分區和針對key進行排序
  5. ReduceTask根據本身的分區號,去各個MapTask機器上取相應的結果分區數據
  6. ReduceTask將取到的來自同一個分區不一樣MapTask的結果文件進行歸併排序
  7. 合併成大文件後,shuffle過程也就結束了,進入reduce方法

Yarn工做機制

Yarn工做機制

job提交全過程

  1. MR程序提交到客戶端所在的節點,YarnRunner向ResourceManager申請一個Application
  2. RM將該Application的資源路徑和做業id返回給YarnRunner
  3. YarnRunner將運行job所需資源提交到HDFS上
  4. 程序資源提交完畢後,申請運行mrAppMaster
  5. RM將用戶的請求初始化成一個Task
  6. 其中一個NodeManager領取到Task任務
  7. 該NodeManager建立容器Container,併產生MRAppmaster
  8. Container從HDFS上拷貝資源到本地
  9. MRAppmaster向RM 申請運行MapTask資源
  10. RM將運行MapTask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務並建立容器
  11. MR向兩個接收到任務的NodeManager發送程序啓動腳本,這兩個NodeManager分別啓動MapTask,MapTask對數據分區排序
  12. MrAppMaster等待全部MapTask運行完畢後,向RM申請容器,運行ReduceTask
  13. ReduceTask向MapTask獲取相應分區的數據
  14. 程序運行完畢後,MR會向RM申請註銷本身

進度和狀態更新:

YARN中的任務將其進度和狀態(包括counter)返回給應用管理器, 客戶端每秒(經過mapreduce.client.progressmonitor.pollinterval設置)嚮應用管理器請求進度更新, 展現給用戶

做業完成:

除了嚮應用管理器請求做業進度外, 客戶端每5秒都會經過調用waitForCompletion()來檢查做業是否完成。時間間隔能夠經過mapreduce.client.completion.pollinterval來設置。做業完成以後, 應用管理器和Container會清理工做狀態。做業的信息會被做業歷史服務器存儲以備以後用戶覈查

歡迎關注下方公衆號,獲取更多文章信息

1
相關文章
相關標籤/搜索