MapReduce是什麼?java
MapReduce源自於Google發表於2004年12月的MapReduce論文,是面向大數據並行處理的計算模型、框架和平臺,而Hadoop MapReduce是Google MapReduce克隆版。apache
若是沒有MapReduce!編程
- 那麼在分佈式計算上面將很難辦,很差編程。
- 在早期沒法處理大數據的離線計算。
- 編程中不易擴展性
- 分佈式計算任務一旦掛了,沒有容錯機制進行處理
說明:MapReduce不擅長的方面(慢!)網絡
- 實時計算:像MySQL同樣,在毫秒級或者秒級內返回結果。
- 流式計算:MapReduce的輸入數據集是靜態的,不能動態變化。
- DAG計算:多個應用程序存在依賴關係,後一個應用程序的輸入爲前一個的輸出
如今MapReduce逐漸被Spark,Flink等框架取代。可是思想很重要,值得學習。架構
input階段,咱們取出文件中的一些數據
splitting階段,咱們將取出的單詞進行分片
Mapping階段,將每一個出現的單詞進行1次統計,轉換數據類型爲(單詞,1)
Shuffling階段,進行hash分片,放入對應的桶,俗稱洗牌,將一樣的單詞放入同一個桶。
Reducing階段,進行數據整合,求出每一個詞的出現的次數
Final result階段,最後獲取到的結果app
1 新建word.txt文件框架
Deer Bear River Car Car River Deer Car Bear
2 導入maven依賴maven
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0-cdh6.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0-cdh6.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.0.0-cdh6.2.0</version> </dependency>
3 map類分佈式
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 四個泛型意思: * Mapper<LongWritable, Text, Text, IntWritable> * KEYIN -> LongWritable:偏移量(存儲該行在整個文件中的起始字節偏移量) * VALUEIN -> Text:進入數據類型 * KEYOUT -> Text:輸出數據鍵類型 * VALUEOUT -> IntWritable:輸出數據值類型 */ public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); private IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到一行數據,以空格切分 String[] words = value.toString().split(" "); //遍歷單詞數據,將數據變成(單詞,1)的形式放入上下文中(框架) for (String word : words) { this.word.set(word); context.write(this.word, one); } } }
4 reducer類ide
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 四個泛型意思: * Reducer<Text, IntWritable, Text, IntWritable> * KEYIN -> Text:輸入數據鍵類型 * VALUEIN -> IntWritable:輸入數據值類型 * KEYOUT -> Text:輸出數據鍵類型 * VALUEOUT -> IntWritable:輸出數據值類型 */ public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable total = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //累加相同單詞的數量 int sum = 0; for (IntWritable value : values) { sum += value.get(); } //包裝結果爲(單詞,總數)輸出 total.set(sum); context.write(key, total); } }
5 執行任務Driver類
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WcDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //獲取Job實例 Job job = Job.getInstance(new Configuration()); //設置工做類 job.setJarByClass(WcDriver.class); //設置Mapper和Reducer類 job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class); //設置Mapper和Reducer輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設置輸入輸出數據 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
6 設置文件輸入輸出參數,執行程序,獲得結果
#### 1.1.3 集羣運行
爲何hadoop要本身實現基本的數據類型而不直接使用Java的類?如:IntWritable,LongWritable,Text。
由於Java的序列化是一個重量級框架(Serializable),一個對象被序列化後,會附帶不少額外的信息(校驗信息,繼承體系,Header等),在網絡中傳輸高效性有影響,因此hadoop本身實現了序列化機制(Writable)。
注:網絡傳輸中的信息都須要序列化,由於hadoop本身實現了序列化機制(Writable),因此咱們才能夠進行簡單的分佈式計算代碼開發。
1 新建flow.txt文件(行號 手機號 IP 網址 上行流量 下行流量 狀態碼)
1 13408542222 192.168.10.1 www.baidu.com 1000 2000 200 2 17358643333 192.168.10.1 www.baidu.com 2000 4000 200 3 13408542222 192.168.10.1 www.baidu.com 1000 2000 200 4 17358643333 192.168.10.1 www.baidu.com 2000 4000 200 5 13408542222 192.168.10.1 www.baidu.com 1000 2000 200 6 17358643333 192.168.10.1 www.baidu.com 2000 4000 200
2 導入maven依賴
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0-cdh6.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0-cdh6.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.0.0-cdh6.2.0</version> </dependency>
3 實體類對象
import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @Getter @Setter @NoArgsConstructor @ToString //注意toString方法和最後打印結果效果相關 public class Flow implements Writable { private long upFlow; private long downFlow; private long totalFlow; public void setFlow(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.totalFlow = upFlow + downFlow; } //序列化方法 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(totalFlow); } //反序列化方法 @Override public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); totalFlow = dataInput.readLong(); } }
4 map類
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> { private Text phone = new Text(); private Flow flow = new Flow(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] data = value.toString().split(" "); phone.set(data[1]); long upFlow = Long.parseLong(data[4]); long downFlow = Long.parseLong(data[5]); flow.setFlow(upFlow, downFlow); context.write(phone, flow); } }
5 reducer類
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text, Flow, Text, Flow> { private Flow flow = new Flow(); @Override protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0; for (Flow value : values) { sumUpFlow += value.getUpFlow(); sumDownFlow += value.getDownFlow(); } flow.setFlow(sumUpFlow, sumDownFlow); context.write(key, flow); } }
6 執行任務Driver類
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //獲取Job實例 Job job = Job.getInstance(new Configuration()); //設置工做類 job.setJarByClass(FlowDriver.class); //設置Mapper和Reducer類 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //設置Mapper和Reducer輸入輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); //設置輸入輸出數據 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
7 設置文件輸入輸出參數,執行程序,獲得結果
1.客戶端發送MR任務到RM上
2.RM分配資源,找到對應的NM,分配Container容器,啓動對應的Application Master
3.Application Master向Applications Manager註冊
4.Application Master向Resource Scheduler申請資源
5.找到對應的NM
6.分配Container容器,啓動對應的的Map Task或者是Reduce Task任務
7.Map Task和Reduce Task對Application Master彙報心跳,任務進度
8.Application Master向Applications Manager彙報總體任務進度,若是執行完了Applications Manager會將Application Master移除注意:原則上MapReduce分爲兩個階段:Map Task和Reduce Task,可是因爲Shuffling階段很重要,人爲劃分了Shuffling階段,這個階段發生在Map Task和Reduce Task之間,能夠理解爲由Map Task後半段和Reduce Task前半段組成。
1G的數據,分紅8份並行計算,那麼每一份須要計算的數據爲128M,感受還不錯。
1M的數據,分紅8份並行計算,那麼每一份須要計算的數據爲128B,感受資源浪費嚴重。
那麼就須要有一個東西來決定怎麼切分,它就是InputFormat,而切分大小通常由HDFS塊大小決定。
針對第四點說明:好比有3個文件,一個300M,第二個50M,第三個50M,那麼一共就是切了5個MapTask出來。
針對每個文件,第一個300M切了3個,第二個50M切了一個,第三個50M切了一個,共5個。
而若是隻有一個文件爲128M+1KB,那麼就只會切分一個,由於切片判斷規則爲->若是文件小於切片大小1.1倍,就和上一個切片將就放在一塊兒了,這樣能夠防止太小的切片在執行任務的時候,調度資源的時間超過執行時間的狀況。
鍵:存儲該行在整個文件中的起始字節偏移量,LongWritable類型
值:爲這行內容,不包括任何行止符(如回車,換行)
示例,一個分片中包含了以下記錄:
#源文件 si chuan cheng du jiang su wu xi he bei bei jing
#被TextInputFormat加載後會變成 (0,si chuan cheng du) (18,jiang su wu xi) (33,he bei bei jing)
能夠經過在驅動類中設置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t");來設置分隔符。默認\t
示例,一個分片中包含了以下記錄:
#源文件 line1 si chuan cheng du line2 jiang su wu xi line3 he bei bei jing
#被KeyValueInputFormat加載後會變成 (line1,si chuan cheng du) (line2,jiang su wu xi) (line3,he bei bei jing)
鍵:存儲該行在整個文件中的起始字節偏移量,LongWritable類型
值:爲這行內容,不包括任何行止符(如回車,換行)
示例,一個分片中包含了以下記錄:
#源文件 si chuan cheng du jiang su wu xi he bei bei jing hu bei wu han
若是N是2,則每一個輸入分片包含2行,開啓2個Map Task
#第一個map收到 (0,si chuan cheng du) (18,jiang su wu xi) #第二個map收到 (33,he bei bei jing) (49,hu bei wu han)
根據設置的閾值來決定切片數。
假設setMaxInputSplitSize值爲5M,以下4個文件
a.txt 2.1M b.txt 5.8M c.txt 3.6M d.txt 7.8M #虛擬儲存過程(由於如上4小個文件在hdfs上佔用了4個塊,因此要有一個虛擬劃塊的過程) 2.1M < 5M 劃分一塊,2.1M 5.8M > 5M 大於5M可是小於2*5M,劃分2個一樣大小的塊,2.9M-2.9M 3.6M < 5M 劃分一塊,3.6M 12M > 5M 大於2*5M,先劃分5M,剩下的 7M > 5M 可是 < 2*5M 劃分2個一樣大小的塊,5M-3.5M-3.5M (獲得結果) 2.1M 2.9M 2.9M 3.6M 4M 3.5M 3.5M #切片過程(補夠5M劃成一塊) 第一塊 2.1M + 2.9M = 5M 第二塊 2.9M + 3.6M = 6.5M 第三塊 4M + 3.5M = 7.5M 第四塊 3.5M
Combiner是MR程序中Mapper和Reducer以外的一種組件。
Combiner組件的父類是Reducer。
Combiner和Reducer的區別是運行位置不一樣。
Combiner的意義就是對每個MapTask的輸出進行局部彙總,以減少網絡傳輸量。
Combiner可以應用的前提是不能影響最終的業務邏輯,且Combiner輸出KV要和Reducer的輸入KV對應。
Mapper Reducer 3 5 7 ->(3+5+7) / 3 = 5 (5+4) / 2 = 4.5 不等於 (3+5+7+2+6) / 5 = 4.6 2 6 ->(2+6) / 2 = 4