MapReduce詳解及shuffle階段

  

hadoop1.xhadoop2.x的區別:

Hadoop1.x版本:java

內核主要由Hdfs和Mapreduce兩個系統組成,其中Mapreduce是一個離線分佈式計算框架,由一個JobTracker和多個TaskTracker組成。編程

JobTracker的主要做用:JobTracker是框架的中心,接收任務,計算資源,分配資源,分配任務,與DataNode進行交流等功能。決策程序失敗時 重啓等操做。又當爹又當媽。數組

TaskTracker同時監視當前機器上的task運行情況。TaskTracker須要把這些信息經過心跳,發送給jobTracker,jobTracker會收集這些信息以給新提交的job分配運行在那些機器上。app

存在問題:框架

1.JobTracker是mapreduce的集中處理點,存在單點故障;分佈式

2.JobTracker完成了太多任務,形成了過多資源的消耗,當mapreduce job很是多的時候,會形成很大的內存消耗,同時 也增長了JobTracker失效的風險,這也是業界廣泛總結出老的hadoop的mapreduce只能支持4000節點主機的上限。ide

 

Hadoop2.x版本:oop

第二代的hadoop版本,爲克服hadoop1.0中的hdfs和mapreduce存在的各類問題而提出的。針對hadoop1.x中的單NameNode制約HDFS的擴展性問題,提出了HDFS Federation,它讓多個NameNode分管不一樣的目錄進而實現訪問隔離和橫向擴展,同時它完全解決了NameNode單點故障問題,針對Hadoop1.0中的Mapreduce的Mapreduce在擴展性和多框架支持等方面不足。spa

MRv2具備與MRv1相同的編程模型和數據處理引擎,惟一不一樣的是運行時環境。MRv2是在MRv1基礎上經加工以後,運行於資源管理框架YARN之上的計算框架MapReduce。它的運行時環境再也不由JobTracker和TaskTracker等服務組成,而是變爲通用資源管理系統YARN和做業控制進程ApplicationMaster,其中,YARN負責資源管理和調度,而ApplicationMaster僅負責一個做業的管理。簡言之,MRv1僅是一個獨立的離線計算框架,而MRv2則是運行於YARN之上的MapReduce。線程

總體上:分爲兩個方面

1.任務調度和資源管理方面:

1)Hadoop1中的JobTracker是一個功能集中的部分,負責資源的分配和任務的分配,因此JobTracker單點出問題就會形成整個集羣沒法使用了,並且MapReduce模式是集成在Hadoop1中,不易分解,很差添加其餘模式;

2)Hadoop2中,ResourceManager(RM)就是負責資源的分配,NodeManager(NM)是從節點上管理資源的,而ApplicationMaster(AM)就是一個負責任務分配的組件,根據不一樣的模式有不一樣的AM,所以MapReduce模式有本身獨有的AM;

2.關於文件系統:

文件系統HDFS,1.x版本沒有HA功能,只能有一個NameNode;而2.x添加了HA部分,還能夠有多個NameNode同時運行,每一個負責集羣中的一部分。

Mapreduce流程包括shuffle階段

Mapreduce的過程總體上分爲四個階段:InputFormat MapTask ReduceTask OutPutFormat 固然中間還有shuffle階段

InputFormat:

咱們經過在runner類中用 job.setInputPaths 或者是addInputPath添加輸入文件或者是目錄(這二者是有區別的)不一樣的業務他們的輸入是不一樣的,我所完成的項目中使用了一個TableMapReduceUtilhbaseMapreduce的整合類)來設置的輸入目錄。

默認是FileInputFormat中的TextInputFormat類,獲取輸入分片,使用默認的RecordReaderLineRecordReader將一個輸入分片中的每一行按\n分割成key-value key是偏移量 value是每一行的內容。調用一次map()方法。一個輸入分片對應一個Maptask任務,

MapTask:

每個key-value通過map()方法業務處理以後開始開始shuffle階段

WordCount爲例:該階段只作+1的操做,(aaa,1),而後開向緩衝區寫入數據

 

Shuffle:

Map-Shuffle:

寫入以前先進行分區Partition,用戶能夠自定義分區(就是繼承Partitioner類),而後定製到job上,若是沒有進行分區,框架會使用 默認的分區(HashPartitioner)對keyhash值以後,而後在對reduceTaskNum進行取模(目的是爲了平衡reduce的處理能力),而後決定由那個reduceTask來處理。

將分完區的結果<key,value,partition>開始序列化成字節數組,開始寫入緩衝區。

隨着map端的結果不端的輸入緩衝區,緩衝區裏的數據愈來愈多,緩衝區的默認大小是100M,當緩衝區大小達到閥值時 默認是0.8spill.percent】(也就是80M,開始啓動溢寫線程,鎖定這80M的內存執行溢寫過程,內存—>磁盤,此時map輸出的結果繼續由另外一個線程往剩餘的20M裏寫,兩個線程相互獨立,彼此互不干擾。

溢寫spill線程啓動後,開始對key進行排序(Sort)默認的是天然排序,也是對序列化的字節數組進行排序(先對分區號排序,而後在對key進行排序)。

若是客戶端自定義了Combiner以後(至關於map階段的reduce),將相同的keyvalue相加,這樣的好處就是減小溢寫到磁盤的數據量(Combiner使用必定得慎重,適用於輸入key/value和輸出key/value類型徹底一致,並且不影響最終的結果)

每次溢寫都會在磁盤上生成一個一個的小文件,由於最終的結果文件只有一個,因此須要將這些溢寫文件歸併到一塊兒,這個過程叫作Merge,最終結果就是一個group({「aaa」,[5,8,3]})

集合裏面的值是從不一樣的溢寫文件中讀取來的。這時候Map-Shuffle就算是完成了。

一個MapTask端生成一個結果文件。

ReduceTask:

Reduce-Shuffle:

接下來開始進行Reduce-Shuffle 階段。當MapTask完成任務數超過總數的5%後,開始調度執行ReduceTask任務,而後ReduceTask默認啓動5copy線程到完成的MapTask任務節點上分別copy一份屬於本身的數據(使用Http的方式)。

這些拷貝的數據會首先保存到內存緩衝區中,當達到必定的閥值的時候,開始啓動內存到磁盤的Merge,也就是溢寫過程,一致運行直到map端沒有數據生成,最後啓動磁盤到磁盤的Merge方式生成最終的那個文件。在溢寫過程當中,而後鎖定80M的數據,而後在延續Sort過程,而後記性group(分組)將相同的key放到一個集合中,而後在進行Merge

而後就開始reduceTask就會將這個文件交給reduced()方法進行處理,執行相應的業務邏輯

OutputFormat:

默認輸出到HDFS上,文件名稱是part-00001

當咱們輸出須要指定到不一樣於HDFS時,須要自定義輸出類繼承OutputFormat

 

Mapreduce編程模型

 

public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

 

@Override

 

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {}

 

}

public static class MyReduce extends Reducer<Text, IntWritable, Text, Text> {

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {}

}

public class Runner extends Configured implements Tool {

 

@Override

public int run(String[] args) throws Exception {

 

}

public static void main(String[] args) throws Exception {

ToolRunner.run(new Runner(), args);

}

}

 

Mapreduce中的技術:

Job依賴:

應用背景:當某個需求使用一個Mapreduce程序沒法完成業務計算時,一般須要兩個mapreduce來配個完成 其實就是兩個job

關鍵代碼:

自定義數據類型:二次排序

Class MyWritable Implements WritableComparable<MyWritable>

重寫 write() readFile() compareTo() HashCode() equals()方法

自定義合併:

繼承Reducer 重寫reduce()方法

自定義分區:

繼承 Partitioner重寫getPartition()方法

自定義分組:比較字段是String類型

若是是Int類型就使用如下這種方法

添加緩衝文件:

   自定義多文件輸出:

查看MutipleFileOutput.java

相關文章
相關標籤/搜索