源碼見:https://github.com/hiszm/hadoop-trainjava
是一個分佈式計算框架 ,用於編寫批處理應用程序。編寫好的程序能夠提交到 Hadoop 集羣上用於並行處理大規模的數據集。MapReduce 做業經過將輸入的數據集拆分爲獨立的塊,這些塊由
map
以 並行 的方式處理,框架對map
的輸出進行排序,而後輸入到reduce
中git
咱們編程主要關注的是如何Splitting
和如何Reduce
MapReduce 框架專門用於 <key,value>
鍵值對處理,它將做業的輸入視爲一組 <key,value>
對,並生成一組 <key,value>
對做爲輸出。github
MapReduce將做業拆分紅Map階段和Reduce階段apache
input : 讀取文本文件;編程
splitting : 將文件按照行進行拆分,此時獲得的 K1
行數,V1
表示對應行的文本內容;markdown
mapping : 並行將每一行按照空格進行拆分,拆分獲得的 List(K2,V2)
,其中 K2
表明每個單詞,因爲是作詞頻統計,因此 V2
的值爲 1,表明出現 1 次;app
shuffling:因爲 Mapping
操做多是在不一樣的機器上並行處理的,因此須要經過 shuffling
將相同 key
值的數據分發到同一個節點上去合併,這樣才能統計出最終的結果,此時獲得 K2
爲每個單詞,List(V2)
爲可迭代集合,V2
就是 Mapping 中的 V2;框架
Reducing
對 List(V2)
進行歸約求和操做,最終輸出。(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
Mapper分佈式
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @Public @Stable public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Mapper() { } protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); } protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) { this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } } public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
Reduceride
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.hadoop.mapreduce; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator; import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; @Checkpointable @Public @Stable public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Reducer() { } protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { Iterator i$ = values.iterator(); while(i$.hasNext()) { VALUEIN value = i$.next(); context.write(key, value); } } protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator<VALUEIN> iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } } public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
MapReduce編程模型之執行步驟
MapReduce編程模型之核心概念