咱們上一節講了關於 MapReduce
中的應用場景和架構分析,最後還使用了一個CountWord
的Demo
來進行演示,關於MapReduce
的具體操做。若是還不瞭解的朋友能夠看看上篇文章:初識MapReduce的應用場景(附JAVA和Python代碼)html
接下來,咱們會講解關於MapReduce
的編程模型,這篇文章的主要目的就是講清楚Mapreduce
的編程模型有多少種,它們之間是怎麼協調合做的,會盡可能從源碼的角度來解析,最後就是講解不一樣的語言是如何調用Hadoop
中的Mapreduce
的API
的。面試
咱們先來看一張圖,關於MapReduce
的編程模型 算法
用戶程序層是指用戶用編寫好的代碼來調用MapReduce
的接口層。apache
工具層編程
Hadoop
中的MapReduce
向集羣提交複雜的做業任務,提交了任務到集羣中後,造成的任務是一個有向圖。每個任務都有兩個方法 submit()
和waitForCompletion(boolean)
,submit()
方法是向集羣中提交做業,而後當即返回,waitForCompletion(boolean)
就是等待集羣中的做業是否已經完成了,若是完成了,獲得的結果能夠看成下個任務的輸入。chain Mapper
和 chain Reducer
的這個模塊,是爲了用戶編寫鏈式做業,形式相似於 Map + Reduce Map *
,表達的意思就是隻有一個Reduce
,在Reduce
的先後能夠有多個Map
Hadoop Streaming
支持的是腳本語言,例Python、PHP等來調用Hadoop
的底層接口,Hadoop Pipes
支持的是 C++
來調用。編程接口層,這一層是所有由Java
語言來實現的,若是是Java
來開發的話,那麼能夠直接使用這一層。bash
對輸入進入MapReduce
的文件進行規範處理,主要包括InputSplit
和RecordReader
兩個部分。TextOutputFormat
是默認的文件輸入格式。網絡
這個是指對輸入的文件進行邏輯切割,切割成一對對Key-Value
值。有兩個參數能夠定義InputSplit
的塊大小,分別是mapred.max.split.size
(記爲minSize
)和mapred.min.split.size
(記爲maxSize
)。架構
是指做業在InputSplit
中切割完成後,輸出Key-Value
對,再由RecordReader
進行讀取到一個個Mapper
文件中。若是沒有特殊定義,一個Mapper
文件的大小就是由Hadoop
的block_size
決定的,Hadoop 1.x
中的block_size
是64M
,在Hadoop 2.x
中的 block_size
的大小就是128M
。app
在Hadoop2.x
以上的版本中,一個splitSize
的計算公式爲負載均衡
splitSize = max\{minSize,min\{maxSize, blockSize\}\}
複製代碼
對輸出的文件進行規範處理,主要的工做有兩個部分,一個是檢查輸出的目錄是否已經存在,若是存在的話就會報錯,另外一個是輸出最終結果的文件到文件系統中,TextOutputFormat
是默認的輸出格式。
OutputCommiter
的做用有六點:
job
)的初始化//進行做業的初始化,創建臨時目錄。
//若是初始化成功,那麼做業就會進入到 Running 的狀態
public abstract void setupJob(JobContext var1) throws IOException;
複製代碼
//若是這個job完成以後,就會刪除掉這個job。
//例如刪除掉臨時的目錄,而後會宣佈這個job處於如下的三種狀態之一,SUCCEDED/FAILED/KILLED
@Deprecated
public void cleanupJob(JobContext jobContext) throws IOException {
}
複製代碼
Task
//初始化Task的操做有創建Task的臨時目錄
public abstract void setupTask(TaskAttemptContext var1) throws IOException;
複製代碼
Task
的結果//檢查是否須要提交Task,爲的是Task不須要提交的時候提交出去
public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
複製代碼
Task
//任務結束的時候,須要提交任務
public abstract void commitTask(TaskAttemptContext var1) throws IOException;
複製代碼
Task
//若是Task處於KILLED或者FAILED的狀態,這Task就會進行刪除掉臨時的目錄
//若是這個目錄刪除不了(例如出現了異常後,處於被鎖定的狀態),另外一個一樣的Task會被執行
//而後使用一樣的attempt-id去把這個臨時目錄給刪除掉,也就說,必定會把臨時目錄給刪除乾淨
public abstract void abortTask(TaskAttemptContext var1) throws IOException;
複製代碼
在Hadoop
中有一種特殊的文件和特殊的操做,那就是Side-Eddect File
,這個文件的存在是爲了解決某一個Task
由於網絡或者是機器性能的緣由致使的運行時間過長,從而致使拖慢了總體做業的進度,因此會爲每個任務在另外一個節點上再運行一個子任務,而後選擇二者中處理獲得的結果最快的那個任務爲最終結果,這個時候爲了不文件都輸入在同一個文件中,因此就把備胎任務輸出的文件取做爲 Side-Effect File
這個是指輸出KEY-VALUE
對到文件中。
InputFormat
爲每個 InputSplit
生成一個 map
任務,mapper
的實現是經過job
中的setMapperClass(Class)
方法來配置寫好的map
類,如這樣
//設置要執行的mapper類
job.setMapperClass(WordMapper.class);
複製代碼
其內部是調用了map(WritableComparable, Writable, Context)
這個方法來爲每個鍵值對寫入到InputSplit
,程序會調用cleanup(Context)
方法來執行清理任務,清理掉不須要使用到的中間值。
關於輸入的鍵值對類型不須要和輸出的鍵值對類型同樣,並且輸入的鍵值對能夠映射到0個或者多個鍵值對。經過調用context.write(WritableComparable, Writable)
來收集輸出的鍵值對。程序使用Counter
來統計鍵值對的數量,
在Mapper
中的輸出被排序後,就會被劃分到每一個Reducer
中,分塊的總數目和一個做業的reduce
任務的數目是同樣的。
關於一個機器節點適合多少個map
任務,官方的文檔的建議是,一個節點有10
個到100
個任務是最好的,若是是cpu
低消耗的話,300
個也是能夠的,最合理的一個map任務是須要運行超過1
分鐘。
Reducer
任務的話就是將Mapper
中輸出的結果進行統計合併後,輸出到文件系統中。 用戶能夠自定義Reducer
的數量,使用Job.setNumReduceTasks(int)
這個方法。 在調用Reducer
的話,使用的是Job.setReducerClass(Class)
方法,內部調用的是reduce(WritableComparable, Iterable<Writable>, Context)
這個方法,最後,程序會調用cleanup(Context)
來進行清理工做。如這樣:
//設置要執行的reduce類
job.setReducerClass(WordReduce.class);
複製代碼
Reducer
其實是分三個階段,分別是Shuffle
、Sort
和Secondary Sort
。
這個階段是指Reducer
的輸入階段,系統會爲每個Reduce
任務去獲取全部的分塊,經過的是HTTP
的方式
這個階段是指在輸入Reducer
階段的值進行分組,sort
和shuffle
是同時進行的,能夠這麼理解,一邊在輸入的時候,同時在一邊排序。
這個階段不是必需的,只有在中間過程當中對key
的排序和在reduce
的輸入以前對key
的排序規則不一樣的時候,纔會啓動這個過程,能夠經過的是Job.setSortComparatorClass(Class)
來指定一個Comparator
進行排序,而後再結合Job.setGroupingComparatorClass(Class)
來進行分組,最後能夠實現二次排序。
在整個reduce
中的輸出是沒有排序
建議是0.95
或者是1.75
*mapred.tasktracker.reduce.tasks.maximum
。若是是0.95
的話,那麼就能夠在mapper
任務結束時,立馬就能夠啓動Reducer
任務。若是是1.75
的話,那麼運行的快的節點就能夠在map
任務完成的時候先計算一輪,而後等到其餘的節點完成的時候就能夠計算第二輪了。固然,Reduce
任務的個數不是越多就越好的,個數多會增長系統的開銷,可是能夠在提高負載均衡,從而下降因爲失敗而帶來的負面影響。
這個模塊用來劃分鍵值空間,控制的是map
任務中的key
值分割的分區,默認使用的算法是哈希函數,HashPartitioner
是默認的Partitioner
。
這篇文章主要就是講了MapReduce
的框架模型,分別是分爲用戶程序層、工具層、編程接口層這三層,在編程接口層主要有五種編程模型,分別是InputFomat
、MapperReduce
、Partitioner
、OnputFomat
和Reducer
。主要是偏理論,代碼的參考例子能夠參考官方的例子:WordCount_v2.0
這是MapReduce
系列的第二篇,接下來的一篇會詳細寫關於MapReduce
的做業配置和環境,結合一些面試題的彙總,因此接下來的這篇仍是乾貨滿滿的,期待着就行了。
更多幹貨,歡迎關注個人公衆號:spacedong