詳解MapReduce中的五大編程模型

前言

咱們上一節講了關於 MapReduce 中的應用場景和架構分析,最後還使用了一個CountWordDemo來進行演示,關於MapReduce的具體操做。若是還不瞭解的朋友能夠看看上篇文章:初識MapReduce的應用場景(附JAVA和Python代碼)html

接下來,咱們會講解關於MapReduce的編程模型,這篇文章的主要目的就是講清楚Mapreduce的編程模型有多少種,它們之間是怎麼協調合做的,會盡可能從源碼的角度來解析,最後就是講解不一樣的語言是如何調用Hadoop中的MapreduceAPI的。面試

目錄

  • MapReduce 編程模型的框架
  • 五種編程模型的詳解
    • InputFormat
    • OutPutFormat
    • Mapper
    • Reducer
    • Partitioner
  • 總結

MapReduce 編程模型的框架

咱們先來看一張圖,關於MapReduce的編程模型 算法

MapReduce`的框架圖

  • 用戶程序層

用戶程序層是指用戶用編寫好的代碼來調用MapReduce的接口層。apache

  • 工具層編程

    • Job control 是爲了監控Hadoop中的MapReduce向集羣提交複雜的做業任務,提交了任務到集羣中後,造成的任務是一個有向圖。每個任務都有兩個方法 submit()waitForCompletion(boolean)submit()方法是向集羣中提交做業,而後當即返回,waitForCompletion(boolean)就是等待集羣中的做業是否已經完成了,若是完成了,獲得的結果能夠看成下個任務的輸入。
    • chain Mapperchain Reducer 的這個模塊,是爲了用戶編寫鏈式做業,形式相似於 Map + Reduce Map *,表達的意思就是隻有一個Reduce,在Reduce的先後能夠有多個Map
    • Hadoop Streaming支持的是腳本語言,例Python、PHP等來調用Hadoop的底層接口,Hadoop Pipes 支持的是 C++來調用。
  • 編程接口層,這一層是所有由Java語言來實現的,若是是Java來開發的話,那麼能夠直接使用這一層。bash

詳解五種編程模型

InputFormat

做用

對輸入進入MapReduce的文件進行規範處理,主要包括InputSplitRecordReader兩個部分。TextOutputFormat是默認的文件輸入格式。網絡

InputForMat中的流程圖

InputSplit

這個是指對輸入的文件進行邏輯切割,切割成一對對Key-Value值。有兩個參數能夠定義InputSplit的塊大小,分別是mapred.max.split.size(記爲minSize)和mapred.min.split.size(記爲maxSize)。架構

RecordReader

是指做業在InputSplit中切割完成後,輸出Key-Value對,再由RecordReader進行讀取到一個個Mapper文件中。若是沒有特殊定義,一個Mapper文件的大小就是由Hadoopblock_size決定的,Hadoop 1.x中的block_size64M,在Hadoop 2.x中的 block_size的大小就是128Mapp

切割塊的大小

Hadoop2.x以上的版本中,一個splitSize的計算公式爲負載均衡

splitSize = max\{minSize,min\{maxSize, blockSize\}\}
複製代碼

OutputFormat

做用

對輸出的文件進行規範處理,主要的工做有兩個部分,一個是檢查輸出的目錄是否已經存在,若是存在的話就會報錯,另外一個是輸出最終結果的文件到文件系統中,TextOutputFormat是默認的輸出格式。

OnputForMat中的流程圖

OutputCommiter

OutputCommiter的做用有六點:

  • 做業(job)的初始化
//進行做業的初始化,創建臨時目錄。
//若是初始化成功,那麼做業就會進入到 Running 的狀態
public abstract void setupJob(JobContext var1) throws IOException;
複製代碼
  • 做業運行結束後就刪除做業
//若是這個job完成以後,就會刪除掉這個job。
//例如刪除掉臨時的目錄,而後會宣佈這個job處於如下的三種狀態之一,SUCCEDED/FAILED/KILLED
@Deprecated
    public void cleanupJob(JobContext jobContext) throws IOException {
    }
複製代碼
  • 初始化 Task
//初始化Task的操做有創建Task的臨時目錄
public abstract void setupTask(TaskAttemptContext var1) throws IOException;
複製代碼
  • 檢查是否提交Task的結果
//檢查是否須要提交Task,爲的是Task不須要提交的時候提交出去
public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
複製代碼
  • 提交Task
//任務結束的時候,須要提交任務
public abstract void commitTask(TaskAttemptContext var1) throws IOException;
複製代碼
  • 回退Task
//若是Task處於KILLED或者FAILED的狀態,這Task就會進行刪除掉臨時的目錄
//若是這個目錄刪除不了(例如出現了異常後,處於被鎖定的狀態),另外一個一樣的Task會被執行
//而後使用一樣的attempt-id去把這個臨時目錄給刪除掉,也就說,必定會把臨時目錄給刪除乾淨
 public abstract void abortTask(TaskAttemptContext var1) throws IOException;

複製代碼

處理Task Side-Effect File

Hadoop中有一種特殊的文件和特殊的操做,那就是Side-Eddect File,這個文件的存在是爲了解決某一個Task由於網絡或者是機器性能的緣由致使的運行時間過長,從而致使拖慢了總體做業的進度,因此會爲每個任務在另外一個節點上再運行一個子任務,而後選擇二者中處理獲得的結果最快的那個任務爲最終結果,這個時候爲了不文件都輸入在同一個文件中,因此就把備胎任務輸出的文件取做爲 Side-Effect File

RecordWriter

這個是指輸出KEY-VALUE對到文件中。

Mapper和Reducer

詳解Mapper

InputFormat 爲每個 InputSplit 生成一個 map 任務,mapper的實現是經過job中的setMapperClass(Class)方法來配置寫好的map類,如這樣

//設置要執行的mapper類
job.setMapperClass(WordMapper.class);
複製代碼

其內部是調用了map(WritableComparable, Writable, Context)這個方法來爲每個鍵值對寫入到InputSplit,程序會調用cleanup(Context)方法來執行清理任務,清理掉不須要使用到的中間值。

關於輸入的鍵值對類型不須要和輸出的鍵值對類型同樣,並且輸入的鍵值對能夠映射到0個或者多個鍵值對。經過調用context.write(WritableComparable, Writable)來收集輸出的鍵值對。程序使用Counter來統計鍵值對的數量,

Mapper中的輸出被排序後,就會被劃分到每一個Reducer中,分塊的總數目和一個做業的reduce任務的數目是同樣的。

須要多少個Mapper任務

關於一個機器節點適合多少個map任務,官方的文檔的建議是,一個節點有10個到100個任務是最好的,若是是cpu低消耗的話,300個也是能夠的,最合理的一個map任務是須要運行超過1分鐘。

詳解Reducer

Reducer任務的話就是將Mapper中輸出的結果進行統計合併後,輸出到文件系統中。 用戶能夠自定義Reducer的數量,使用Job.setNumReduceTasks(int)這個方法。 在調用Reducer的話,使用的是Job.setReducerClass(Class)方法,內部調用的是reduce(WritableComparable, Iterable<Writable>, Context)這個方法,最後,程序會調用cleanup(Context)來進行清理工做。如這樣:

//設置要執行的reduce類
job.setReducerClass(WordReduce.class);
複製代碼

Reducer其實是分三個階段,分別是ShuffleSortSecondary Sort

shuffle

這個階段是指Reducer的輸入階段,系統會爲每個Reduce任務去獲取全部的分塊,經過的是HTTP的方式

sort

這個階段是指在輸入Reducer階段的值進行分組,sortshuffle是同時進行的,能夠這麼理解,一邊在輸入的時候,同時在一邊排序。

Secondary Sort

這個階段不是必需的,只有在中間過程當中對key的排序和在reduce的輸入以前對key的排序規則不一樣的時候,纔會啓動這個過程,能夠經過的是Job.setSortComparatorClass(Class)來指定一個Comparator進行排序,而後再結合Job.setGroupingComparatorClass(Class)來進行分組,最後能夠實現二次排序。

在整個reduce中的輸出是沒有排序

須要多少個 Reducer 任務

建議是0.95或者是1.75*mapred.tasktracker.reduce.tasks.maximum。若是是0.95的話,那麼就能夠在mapper任務結束時,立馬就能夠啓動Reducer任務。若是是1.75的話,那麼運行的快的節點就能夠在map任務完成的時候先計算一輪,而後等到其餘的節點完成的時候就能夠計算第二輪了。固然,Reduce任務的個數不是越多就越好的,個數多會增長系統的開銷,可是能夠在提高負載均衡,從而下降因爲失敗而帶來的負面影響。

Partitioner

這個模塊用來劃分鍵值空間,控制的是map任務中的key值分割的分區,默認使用的算法是哈希函數,HashPartitioner是默認的Partitioner

總結

這篇文章主要就是講了MapReduce的框架模型,分別是分爲用戶程序層、工具層、編程接口層這三層,在編程接口層主要有五種編程模型,分別是InputFomatMapperReducePartitionerOnputFomatReducer。主要是偏理論,代碼的參考例子能夠參考官方的例子:WordCount_v2.0

這是MapReduce系列的第二篇,接下來的一篇會詳細寫關於MapReduce的做業配置和環境,結合一些面試題的彙總,因此接下來的這篇仍是乾貨滿滿的,期待着就行了。

更多幹貨,歡迎關注個人公衆號:spacedong

image
相關文章
相關標籤/搜索