MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算,用於解決海量數據的計算問題。java
MapReduce分兩部分組成web
①映射(Mapping):對集合裏面的每個目標進行相同的操做,好比你要將一個表單裏面的每一個單元格作乘以2的操做,那麼你就能夠將乘以2這個方法應用到表單裏面的每一個單元格上面。apache
②化簡(Reducing):遍歷集合中的元素來返回一個綜合的結果。即,輸出表單裏一列數字的和這個任務屬於reducing。編程
執行過程:你向MapReduce框架提交一個計算做業時,它會首先把計算做業拆分紅若干個Map任務,而後分配到不一樣的節點上去執行,每個Map任務處理輸入數據中的一部分,當Map任務完成後,它會生成一些中間文件,這些中間文件將會做爲Reduce任務的輸入數據。Reduce任務的主要目標就是把前面若干個Map的輸出彙總到一塊兒並輸出。app
①Map函數:(k1 : v1) -->[(k2 : v2)]框架
→輸入:鍵值對(k1 : v1)表示的數據。ide
→處理:文檔數據記錄(如文本文件中的一行)以鍵值對的形式傳入map函數,處理完成以後以另外一種鍵值對的形式輸出處理結果[(k2 : v2)]。函數
→輸出:鍵值對[(k2 : v2)]表示的一組中間數據。oop
②Reduce函數:(k2 : [v2]) --> [(k3 : k4)]測試
→輸入:map輸出的一組鍵值對[(k2 : v2)]將被進行合併處理將一樣主鍵下的不一樣值合併到一個列表[v2]中,故reduce的輸入爲(k2 : [v2])。
→處理:對傳入的中間結果列表數據進行某種整理或進一步處理,並輸出最終的某種的鍵值對形式的輸出結果[(k3 : k4)]。
→輸出:鍵值對[(k3 : k4)]表示最終數據。
注意:各個map函數對所劃分的數據進行並行處理,從不一樣的輸入數據產生不一樣的輸出數據。進行reduce處理以前必須等到全部的map函數作完。最終彙總全部的reduce的輸出結果便可得到最終結果。
1,sean
2,bob
3,sean
4,bob
5,jf
從上面的數據分析出,咱們須要的是一行數據中的後一個數據,在map函數中,輸入端v1表明的是一行數據,輸出端的k2能夠表明是被統計的姓名。在reduce函數中,k2仍是被統計的姓名,而[v2]是一個數據集,這裏是將k2相同的鍵的v2數據合併起來。輸出的是本身須要的數據k3表明的是統計的姓名,v3是姓名出現的次數。
代碼實現:
解析文件數據
package com.jf.mapreduce; import org.apache.hadoop.io.Text; public class NameRecordParser { private String nameId; private String name; private boolean valid; // 解析每行數據 public void parse(String line) { String[] strs = line.split(","); if (strs.length == 2) { nameId = strs[0].trim(); name = strs[1].trim(); if (nameId.length() > 0 && name.length() > 0) { valid = true; } } } public void parse(Text line) { parse(line.toString()); } public String getNameId() { return nameId; } public void setNameId(String nameId) { this.nameId = nameId; } public String getName() { return name; } public void setName(String name) { this.name = name; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } }
MapReduce處理
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class NameReference extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = getConf(); String input = conf.get("input"); String output = conf.get("output"); // 構建做業配置 Job job = Job.getInstance(conf, "NameReference"); // 設置做業所要執行的類 job.setJarByClass(this.getClass()); // 設置自定義的mapper類,以及tapper類的輸出key和value類型。 job.setMapperClass(NameMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置自定義的Reducer類以及輸出時的類型 job.setReducerClass(NameReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置讀取最原始數據的格式信息以及 // 數據輸出到HDFS集羣中的格式信息 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, new Path(input)); TextOutputFormat.setOutputPath(job, new Path(output)); return job.waitForCompletion(true) ? 0 : 1; } private static class NameMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private NameRecordParser parser = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { this.parser = new NameRecordParser(); parser.parse(value); System.out.println(value); if (parser.isValid()) { Text resultKey = new Text(parser.getName()); IntWritable resultValue = new IntWritable(1); System.out.println("map:resultKey=" + resultKey.toString() + ",value=" + resultValue.get()); context.write(resultKey, resultValue); } } } private static class NameReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : values) { count += intWritable.get(); } System.out.println("reduce:key=" + key + ",value=" + count); context.write(key, new IntWritable(count)); } } public static void main(String[] args) { try { System.exit(ToolRunner.run(new NameReference(), args)); } catch (Exception e) { e.printStackTrace(); } } }
新建測試數據文件
提交文件到hadoop文件系統中
確認文件提交成功
執行MapReduce分析數據
能夠經過web查看執行進度
http://192.168.1.113:8088/cluster/apps
查看執行結果
也能夠經過日誌查看執行過程
014399999999999/1992-01-31/10
014399999999999/1992-02-28/11
014399999999999/1992-03-31/14
014399999999999/1992-04-30/16
014399999999999/1992-05-51/30
014399999999999/1992-06-30/33
014399999999999/1992-07-31/35
014399999999999/1993-01-31/10
014399999999999/1993-02-28/14
014399999999999/1993-03-31/13
014399999999999/1993-04-30/25
014399999999999/1993-05-31/30
014399999999999/1993-06-30/36
014399999999999/1993-07-31/38
014399999999999/1994-01-31/10
014399999999999/1994-02-28/14
014399999999999/1994-03-31/13
014399999999999/1994-04-30/25
014399999999999/1994-05-31/30
014399999999999/1994-06-30/36
014399999999999/1994-07-31/35
提交數據文件到文件系統中
代碼解析數據文件
package com.jf.mapreduce; import org.apache.hadoop.io.Text; public class WeatherRecordParser { private String stationId; private String year; private int temperature = -999; private boolean valid; /** * 解析數據 * * @param line * 氣象站/年月日/溫度 * 014399999999999/1992-01-31/10 */ public void parse(String line) { String[] strs = line.split("/"); if (strs.length == 3) { if (strs[0] != null && strs[0].length() > 0) { stationId = strs[0]; } if (strs[1] != null && strs[1].length() > 0) { year = strs[1].substring(0, 4); } if (strs[2] != null && strs[2].length() > 0) { temperature = Integer.parseInt(strs[2]); } if (stationId != null && year != null & temperature > -999) { valid = true; } } } public void parse(Text value) { parse(value.toString()); } public String getStationId() { return stationId; } public void setStationId(String stationId) { this.stationId = stationId; } public String getYear() { return year; } public void setYear(String year) { this.year = year; } public int getTemperature() { return temperature; } public void setTemperature(int temperature) { this.temperature = temperature; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } }
統計每一年最大氣溫
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 獲取每一年最高溫度 * * @author Administrator * */ public class MaxTemperatureByYear extends Configured implements Tool { private static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private WeatherRecordParser parser = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { parser = new WeatherRecordParser(); parser.parse(value); if (parser.isValid()) { Text resultKey = new Text(parser.getYear()); IntWritable resultValue = new IntWritable(parser.getTemperature()); System.out.println("map:resultKey=" + resultKey + ",resultValue=" + resultValue); context.write(resultKey, resultValue); } } } private static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 獲取相同key中最大的值 int max = Integer.MIN_VALUE; for (IntWritable intWritable : values) { if (intWritable.get() > max) { max = intWritable.get(); } } System.out.println("reducer:key=" + key + ",value=" + max); context.write(key, new IntWritable(max)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); // 構建做業所處理數據的輸入輸出路徑 Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); // 構建做業配置 Job job = Job.getInstance(conf, MaxTemperatureByYear.class.getName()); // 設置做業所要執行的類 job.setJarByClass(MaxTemperatureByYear.class); // 設置自定義的mapper類,以及tapper類的輸出key和value類型。 job.setMapperClass(MaxTempMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置自定義的Reducer類以及輸出時的類型 job.setReducerClass(MaxTempReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置讀取最原始數據的格式信息,以及數據輸出到HDFS集羣中的格式信息 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new MaxTemperatureByYear(), args)); } }
提交任務執行
File System Counters FILE: Number of bytes read=237 FILE: Number of bytes written=233027 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=755 HDFS: Number of bytes written=168 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=7929 Total time spent by all reduces in occupied slots (ms)=8445 Total time spent by all map tasks (ms)=7929 Total time spent by all reduce tasks (ms)=8445 Total vcore-seconds taken by all map tasks=7929 Total vcore-seconds taken by all reduce tasks=8445 Total megabyte-seconds taken by all map tasks=8119296 Total megabyte-seconds taken by all reduce tasks=8647680 Map-Reduce Framework Map input records=21 Map output records=21 Map output bytes=189 Map output materialized bytes=237 Input split bytes=106 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=237 Reduce input records=21 Reduce output records=21 Spilled Records=42 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=221 CPU time spent (ms)=2330 Physical memory (bytes) snapshot=310419456 Virtual memory (bytes) snapshot=1687691264 Total committed heap usage (bytes)=164040704 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=649 File Output Format Counters Bytes Written=168
能夠經過web頁面查看執行狀態
查看執行日誌
map計算日誌輸出
reduce計算統計日誌
在文件系統中查看執行結果
修改配置文件:yarn-site.xml
從新執行時能夠看到reduce個數爲2
測試執行
任務分配:
①假設咱們有一個HDFS集羣有4個節點分別是us1,us2,us3,us4。Yarn集羣的主節點在分配資源的時候,當你客戶端將做業提交的時候,resourcemanager在分配資源(或者說分配做業)的時候,儘可能將應用程序分發到有數據的節點上。這樣就避免了數據在節點與節點之間傳輸。
②那麼在us1,us2,us3中都至少有一個map任務,當map輸出後通過洗牌,會根據key值的不一樣生成不少組以key不一樣的數據,好比咱們輸出了(k21 : [v21]),(k22 : [v22])。咱們知道前面的map是並行執行的(多個map同時運行,由於處理的數據在不一樣的數據塊),當咱們的reduce爲默認的時候是有1個,是有一個reduce因此不多是並行。咱們的reduce只有一個,而又兩組數據那麼哪一個先執行?Hadoop是這樣規定的,咱們對數據進行分組是根據key值來分組的。那麼Hadoop會讓這一系列的key去比較大小,最小的先進入執行,執行完成後,按照從小到大去執行。
③當reduce任務執行完成以後會生成一個文件:part-r-00000
若是咱們有2個reduce,也有2組數據,那麼這個並行計算如何進行。
Hadoop會讓每一組數據的key值得hash值去和reduce的個數取餘,餘數是幾那麼就進入哪一個reduce。固然前提是給reduce編號(編號是Hadoop內部本身會去編)。
第一個reduce生成的是part-r-00000,第二個則是part-r-00001(後面的00000和00001就是reduce的編號)。例如:當第一組數據key的hash值與reduce個數取餘爲0則會讓第一個reduce執行,當第二組數據key的hash值與reduce個數取餘也爲0,一樣會讓第一個reduce執行。這樣第二個reduce一樣會生成一個結果文件,第一個文件裏面存放的是第一組和第二組數據結果,第二個文件爲空。
數據分組和數據分片
①數據分片:
咱們把進入map端的數據叫作數據分片。每個數據塊進入MapReudce中的map程序的時候,咱們把它叫作數據分片。
那什麼樣的數據是一個數據分片?HDFS集羣上的一個數據塊的數據對應咱們所說的數據分片。
也就是每個數據分片由每個map任務去處理。
②數據分組:
數據通過map處理以後分紅不一樣的組造成數據的過程叫作數據分組。