hadoop筆記六:MapReduce基礎

1.概念

MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算,用於解決海量數據的計算問題。java

MapReduce分兩部分組成web

①映射(Mapping):對集合裏面的每個目標進行相同的操做,好比你要將一個表單裏面的每一個單元格作乘以2的操做,那麼你就能夠將乘以2這個方法應用到表單裏面的每一個單元格上面。apache

②化簡(Reducing):遍歷集合中的元素來返回一個綜合的結果。即,輸出表單裏一列數字的和這個任務屬於reducing。編程

執行過程:你向MapReduce框架提交一個計算做業時,它會首先把計算做業拆分紅若干個Map任務,而後分配到不一樣的節點上去執行,每個Map任務處理輸入數據中的一部分,當Map任務完成後,它會生成一些中間文件,這些中間文件將會做爲Reduce任務的輸入數據。Reduce任務的主要目標就是把前面若干個Map的輸出彙總到一塊兒並輸出。app

2.執行過程

①Map函數:(k1 : v1) -->[(k2 : v2)]框架

→輸入:鍵值對(k1 : v1)表示的數據。ide

→處理:文檔數據記錄(如文本文件中的一行)以鍵值對的形式傳入map函數,處理完成以後以另外一種鍵值對的形式輸出處理結果[(k2 : v2)]。函數

→輸出:鍵值對[(k2 : v2)]表示的一組中間數據。oop

②Reduce函數:(k2 : [v2]) --> [(k3 : k4)]測試

→輸入:map輸出的一組鍵值對[(k2 : v2)]將被進行合併處理將一樣主鍵下的不一樣值合併到一個列表[v2]中,故reduce的輸入爲(k2 : [v2])。

→處理:對傳入的中間結果列表數據進行某種整理或進一步處理,並輸出最終的某種的鍵值對形式的輸出結果[(k3 : k4)]。

→輸出:鍵值對[(k3 : k4)]表示最終數據。

注意:各個map函數對所劃分的數據進行並行處理,從不一樣的輸入數據產生不一樣的輸出數據。進行reduce處理以前必須等到全部的map函數作完。最終彙總全部的reduce的輸出結果便可得到最終結果。

3.舉例:統計姓名

1)統計下面文件中姓名出現的次數

1,sean

2,bob

3,sean

4,bob

5,jf

從上面的數據分析出,咱們須要的是一行數據中的後一個數據,在map函數中,輸入端v1表明的是一行數據,輸出端的k2能夠表明是被統計的姓名。在reduce函數中,k2仍是被統計的姓名,而[v2]是一個數據集,這裏是將k2相同的鍵的v2數據合併起來。輸出的是本身須要的數據k3表明的是統計的姓名,v3是姓名出現的次數。

代碼實現:

解析文件數據

package com.jf.mapreduce;

import org.apache.hadoop.io.Text;

public class NameRecordParser {

	private String nameId;
	private String name;
	private boolean valid;

	// 解析每行數據
	public void parse(String line) {
		String[] strs = line.split(",");
		if (strs.length == 2) {
			nameId = strs[0].trim();
			name = strs[1].trim();
			if (nameId.length() > 0 && name.length() > 0) {
				valid = true;
			}
		}
	}

	public void parse(Text line) {
		parse(line.toString());

	}

	public String getNameId() {
		return nameId;
	}

	public void setNameId(String nameId) {
		this.nameId = nameId;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public boolean isValid() {
		return valid;
	}

	public void setValid(boolean valid) {
		this.valid = valid;
	}

}

MapReduce處理

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class NameReference extends Configured implements Tool {

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		String input = conf.get("input");
		String output = conf.get("output");

		// 構建做業配置
		Job job = Job.getInstance(conf, "NameReference");
		// 設置做業所要執行的類
		job.setJarByClass(this.getClass());
		// 設置自定義的mapper類,以及tapper類的輸出key和value類型。
		job.setMapperClass(NameMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 設置自定義的Reducer類以及輸出時的類型
		job.setReducerClass(NameReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 設置讀取最原始數據的格式信息以及
		// 數據輸出到HDFS集羣中的格式信息
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		TextInputFormat.addInputPath(job, new Path(input));
		TextOutputFormat.setOutputPath(job, new Path(output));

		return job.waitForCompletion(true) ? 0 : 1;
	}

	private static class NameMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

		private NameRecordParser parser = null;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			this.parser = new NameRecordParser();
			parser.parse(value);
			System.out.println(value);
			if (parser.isValid()) {
				Text resultKey = new Text(parser.getName());
				IntWritable resultValue = new IntWritable(1);
				System.out.println("map:resultKey=" + resultKey.toString() + ",value=" + resultValue.get());
				context.write(resultKey, resultValue);
			}
		}

	}

	private static class NameReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable intWritable : values) {
				count += intWritable.get();
			}
			System.out.println("reduce:key=" + key + ",value=" + count);
			context.write(key, new IntWritable(count));
		}
	}

	public static void main(String[] args) {
		try {
			System.exit(ToolRunner.run(new NameReference(), args));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

2)執行測試結果

新建測試數據文件

提交文件到hadoop文件系統中

確認文件提交成功

執行MapReduce分析數據

能夠經過web查看執行進度

http://192.168.1.113:8088/cluster/apps

查看執行結果

也能夠經過日誌查看執行過程

4.舉例:統計每一年最大氣溫

1)統計下面數據中每一年最高氣溫

014399999999999/1992-01-31/10
014399999999999/1992-02-28/11
014399999999999/1992-03-31/14
014399999999999/1992-04-30/16
014399999999999/1992-05-51/30
014399999999999/1992-06-30/33
014399999999999/1992-07-31/35
014399999999999/1993-01-31/10
014399999999999/1993-02-28/14
014399999999999/1993-03-31/13
014399999999999/1993-04-30/25
014399999999999/1993-05-31/30
014399999999999/1993-06-30/36
014399999999999/1993-07-31/38
014399999999999/1994-01-31/10
014399999999999/1994-02-28/14
014399999999999/1994-03-31/13
014399999999999/1994-04-30/25
014399999999999/1994-05-31/30
014399999999999/1994-06-30/36
014399999999999/1994-07-31/35

提交數據文件到文件系統中

代碼解析數據文件

package com.jf.mapreduce;

import org.apache.hadoop.io.Text;

public class WeatherRecordParser {

	private String stationId;
	private String year;
	private int temperature = -999;
	private boolean valid;

	/**
	 * 解析數據
	 * 
	 * @param line
	 *            氣象站/年月日/溫度 
	 *  014399999999999/1992-01-31/10
	 */
	public void parse(String line) {
		String[] strs = line.split("/");
		if (strs.length == 3) {
			if (strs[0] != null && strs[0].length() > 0) {
				stationId = strs[0];
			}
			if (strs[1] != null && strs[1].length() > 0) {
				year = strs[1].substring(0, 4);
			}
			if (strs[2] != null && strs[2].length() > 0) {
				temperature = Integer.parseInt(strs[2]);
			}
			if (stationId != null && year != null & temperature > -999) {
				valid = true;
			}
		}
	}

	public void parse(Text value) {
		parse(value.toString());
	}

	public String getStationId() {
		return stationId;
	}

	public void setStationId(String stationId) {
		this.stationId = stationId;
	}

	public String getYear() {
		return year;
	}

	public void setYear(String year) {
		this.year = year;
	}

	public int getTemperature() {
		return temperature;
	}

	public void setTemperature(int temperature) {
		this.temperature = temperature;
	}

	public boolean isValid() {
		return valid;
	}

	public void setValid(boolean valid) {
		this.valid = valid;
	}
}

統計每一年最大氣溫

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 獲取每一年最高溫度
 * 
 * @author Administrator
 *
 */
public class MaxTemperatureByYear extends Configured implements Tool {

	private static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

		private WeatherRecordParser parser = null;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			parser = new WeatherRecordParser();
			parser.parse(value);
			if (parser.isValid()) {
				Text resultKey = new Text(parser.getYear());
				IntWritable resultValue = new IntWritable(parser.getTemperature());
				System.out.println("map:resultKey=" + resultKey + ",resultValue=" + resultValue);
				context.write(resultKey, resultValue);
			}
		}
	}

	private static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			// 獲取相同key中最大的值
			int max = Integer.MIN_VALUE;
			for (IntWritable intWritable : values) {
				if (intWritable.get() > max) {
					max = intWritable.get();
				}
			}
			System.out.println("reducer:key=" + key + ",value=" + max);
			context.write(key, new IntWritable(max));
		}
	}

	public int run(String[] args) throws Exception {

		Configuration conf = getConf();
		// 構建做業所處理數據的輸入輸出路徑
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));
		// 構建做業配置
		Job job = Job.getInstance(conf, MaxTemperatureByYear.class.getName());
		// 設置做業所要執行的類
		job.setJarByClass(MaxTemperatureByYear.class);
		// 設置自定義的mapper類,以及tapper類的輸出key和value類型。
		job.setMapperClass(MaxTempMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 設置自定義的Reducer類以及輸出時的類型
		job.setReducerClass(MaxTempReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 設置讀取最原始數據的格式信息,以及數據輸出到HDFS集羣中的格式信息
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MaxTemperatureByYear(), args));
	}

}

提交任務執行

2)執行結果

File System Counters
		FILE: Number of bytes read=237
		FILE: Number of bytes written=233027
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=755
		HDFS: Number of bytes written=168
		HDFS: Number of read operations=6
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=7929
		Total time spent by all reduces in occupied slots (ms)=8445
		Total time spent by all map tasks (ms)=7929
		Total time spent by all reduce tasks (ms)=8445
		Total vcore-seconds taken by all map tasks=7929
		Total vcore-seconds taken by all reduce tasks=8445
		Total megabyte-seconds taken by all map tasks=8119296
		Total megabyte-seconds taken by all reduce tasks=8647680
	Map-Reduce Framework
		Map input records=21
		Map output records=21
		Map output bytes=189
		Map output materialized bytes=237
		Input split bytes=106
		Combine input records=0
		Combine output records=0
		Reduce input groups=3
		Reduce shuffle bytes=237
		Reduce input records=21
		Reduce output records=21
		Spilled Records=42
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=221
		CPU time spent (ms)=2330
		Physical memory (bytes) snapshot=310419456
		Virtual memory (bytes) snapshot=1687691264
		Total committed heap usage (bytes)=164040704
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=649
	File Output Format Counters 
		Bytes Written=168

能夠經過web頁面查看執行狀態

查看執行日誌

map計算日誌輸出

reduce計算統計日誌

在文件系統中查看執行結果

5.配置執行reduce個數(map個數跟存儲文件的block個數有關)

1)經過配置文件設置

修改配置文件:yarn-site.xml

從新執行時能夠看到reduce個數爲2

2)經過程序進行修改

測試執行

3)經過執行命令設置

6.程序在執行時如何選擇reduce

任務分配:

①假設咱們有一個HDFS集羣有4個節點分別是us1,us2,us3,us4。Yarn集羣的主節點在分配資源的時候,當你客戶端將做業提交的時候,resourcemanager在分配資源(或者說分配做業)的時候,儘可能將應用程序分發到有數據的節點上。這樣就避免了數據在節點與節點之間傳輸。

②那麼在us1,us2,us3中都至少有一個map任務,當map輸出後通過洗牌,會根據key值的不一樣生成不少組以key不一樣的數據,好比咱們輸出了(k21 : [v21]),(k22 : [v22])。咱們知道前面的map是並行執行的(多個map同時運行,由於處理的數據在不一樣的數據塊),當咱們的reduce爲默認的時候是有1個,是有一個reduce因此不多是並行。咱們的reduce只有一個,而又兩組數據那麼哪一個先執行?Hadoop是這樣規定的,咱們對數據進行分組是根據key值來分組的。那麼Hadoop會讓這一系列的key去比較大小,最小的先進入執行,執行完成後,按照從小到大去執行

③當reduce任務執行完成以後會生成一個文件:part-r-00000

若是咱們有2個reduce,也有2組數據,那麼這個並行計算如何進行。

 Hadoop會讓每一組數據的key值得hash值去和reduce的個數取餘,餘數是幾那麼就進入哪一個reduce。固然前提是給reduce編號(編號是Hadoop內部本身會去編)。

第一個reduce生成的是part-r-00000,第二個則是part-r-00001(後面的00000和00001就是reduce的編號)。例如:當第一組數據key的hash值與reduce個數取餘爲0則會讓第一個reduce執行,當第二組數據key的hash值與reduce個數取餘也爲0,一樣會讓第一個reduce執行。這樣第二個reduce一樣會生成一個結果文件,第一個文件裏面存放的是第一組和第二組數據結果,第二個文件爲空。

 

數據分組和數據分片

①數據分片:

咱們把進入map端的數據叫作數據分片。每個數據塊進入MapReudce中的map程序的時候,咱們把它叫作數據分片。

那什麼樣的數據是一個數據分片?HDFS集羣上的一個數據塊的數據對應咱們所說的數據分片。 

也就是每個數據分片由每個map任務去處理。

②數據分組:

數據通過map處理以後分紅不一樣的組造成數據的過程叫作數據分組。

相關文章
相關標籤/搜索