Hadoop-MapReduce的shuffle過程及其餘

Hadoop-MapReduce的shuffle過程及其餘

    本篇文章將介紹shuffle的過程以及MapReduce中的其餘一些組件。java

1、Shuffle

    Shuffle實際上是一個過程,並非MapperReducer的一個組件,這個過程是從map輸出數據,到reduce接收處理數據以前,橫跨Mapper和Reducer兩端的,以下圖:node

    shuffle分爲Mapper階段和Reducer階段,下面就兩個階段作具體分析。正則表達式

1.Mapper階段

    每一個MapperTask有一個環形內存緩衝區,用於存儲map任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值默認0.8(io.sort.spill.percent),就會啓動一個後臺線程把環形緩衝區中的內容寫到(spill)磁盤的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。shell

    這裏有一個map()方法(簡稱m)寫入的速度和spill溢出(簡稱s)的速度,m的數據在內存中移動,s是數據由內存到磁盤,雖然s是磁盤的連續寫,可是也比不上m的內存速度,產生的現象是s在前面跑,m在後面追,m的速度>s的速度,那麼m確定在一個時間節點上會追上s,那麼當m追上s的時候,m寫入環形緩衝區的線程就會被阻塞暫停,直到s將環形緩衝區中的數據所有寫入到磁盤中,m的寫入線程纔會被啓動。因此環形緩衝區大小和閥值的大小是能夠根據業務進行調優的點。bash

    寫磁盤前,要partition、sort、Combiner。若是有後續的數據,將會繼續寫入環形緩衝區中,最終寫入下一個溢出文件中。app

    等最後記錄寫完,合併所有溢出寫文件爲一個分區且排序的文件。ide

    若是在最終合併時,被合併的文件大於等於3個,則合併完會再執行一次Combiner,不然不會。工具

    總體Mapper階段的流程以下:oop

input->map->buffer->split(partition-sort-combiner)->merge(partition-sort-combiner(file>=3))->數據落地。性能

2.Reducer階段

    Reducer主動找Mapper獲取本身負責的分區的數據,並不須要全部的Mapper都執行完成後再獲取,哪一個Mapper執行完,當即就去複製。

    複製後,來自多個Mapper的數據要進行merge合併操做。合併後進行分組、排序,造成k3v3,進入reduce處理,處理後產生的結果輸出到目的地。

    總體Reducer階段流程以下:

    fetch->merge(combiner)->grouping->sort->reduce->output。

二、Mapper數量

    Mapper的數量在默認狀況下不可直接控制干預,Mapper的數量由輸入的大小和個數決定。在默認狀況下,最終input佔據了多少block,就應該啓動多少個Mapper。

    此種狀況下,若是有大量的小文件須要處理,則會形成Hadoop集羣崩潰。大量的小文件,每一個小文件都獨佔一個Mapper處理線程,這樣啓動線程和關閉線程消耗的資源會很龐大,文件數量到達一個量級會直接致使集羣崩潰。

    鑑於以上狀況,能夠經過配置mapred.min.split.size來控制split的size的最小值。當每一個split的大小達不到設置的最小值,Hadoop會將這些達不到最小值的split拼接到一塊兒,使用一個Mapper來處理這些文件,當大小超過最小值,才啓動一個新Mapper進行處理。這樣就能夠避免Mapper線程過多致使集羣崩潰的結果。

案例

1.求最大/小值

    求一組數據的最大值或者最小值。

    數據樣例:

123
235345
234
654768
234
4545
324

1>MaxMapper

public class MaxMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
	private int max = Integer.MIN_VALUE;
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context)
					throws IOException, InterruptedException {
		int num = Integer.parseInt(value.toString());
		max = max < num ? num : max;
	}
	@Override
	protected void cleanup(Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(new IntWritable(max), NullWritable.get());
	}
}

 

2>MaxReducer

public class MaxReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {
	private int max=Integer.MIN_VALUE;
	@Override
	protected void reduce(IntWritable k3, Iterable<NullWritable> v3s,
			Reducer<IntWritable, NullWritable, IntWritable, NullWritable>.Context context)
					throws IOException, InterruptedException {
		int num=Integer.parseInt(k3.toString());
		max=max>num?max:num;
	}
	@Override
	protected void cleanup(Reducer<IntWritable, NullWritable, IntWritable, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(new IntWritable(max), NullWritable.get());
	}
}

3>MaxDriver

public class MaxDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Max_Job");
		job.setJarByClass(cn.tedu.max.MaxDriver.class);
		job.setMapperClass(cn.tedu.max.MaxMapper.class);
		job.setReducerClass(cn.tedu.max.MaxReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(NullWritable.class);
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/maxdata"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/maxresult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

2.統計考試成績

    按月份產生文件,統計每一個人的語數外及總分。

    數據樣例:

math.txt

1 zhang 85
2 zhang 59
3 zhang 95
1 wang 74
2 wang 67
3 wang 96
1 li 45
2 li 76
3 li 67

chinese.txt

1 zhang 89
2 zhang 73
3 zhang 67
1 wang 49
2 wang 83
3 wang 27
1 li 77
2 li 66
3 li 89

english.txt)

1 zhang 55
2 zhang 69
3 zhang 75
1 wang 44
2 wang 64
3 wang 86
1 li 76
2 li 84
3 li 93
//如下代碼涉及到的重要方法
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fn = inputSplit.getPath().getName();

1>ScoreBean

public class ScoreBean implements Writable {
	private String name;
	private String subject;
	private int month;
	private int score;
   //這裏省去了如下方法,記得補上
   //……get/set……
   //……有參/無參構造……
   //……read/write……
}

2>Mapper

public class ScoreMapper extends Mapper<LongWritable, Text, Text, ScoreBean> {
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ScoreBean>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String attr[] = line.split(" ");
		String name = attr[1];
		FileSplit fs = (FileSplit) context.getInputSplit();
		String path = fs.getPath().getName();
		String subject = path.substring(0, path.lastIndexOf("."));
		ScoreBean sb = new ScoreBean(attr[1], subject, Integer.parseInt(attr[0]), Integer.parseInt(attr[2]));
		context.write(new Text(name), sb);
	}
}

3>Reducer

public class ScoreReducer extends Reducer<Text, ScoreBean, Text, NullWritable> {
	@Override
	protected void reduce(Text k3, Iterable<ScoreBean> v3s,
			Reducer<Text, ScoreBean, Text, NullWritable>.Context context) throws IOException, InterruptedException {
		String name = k3.toString();
		Map<String, Integer> map = new HashMap<String, Integer>();
		int count = 0;
		Iterator<ScoreBean> it = v3s.iterator();
		while (it.hasNext()) {
			ScoreBean sb = it.next();
			map.put(sb.getSubject(), sb.getScore());
			count += sb.getScore();
		}
		String result = name + " " + map.get("chinese") + " " + map.get("math") + " " + map.get("english") + " "
				+ count;
		context.write(new Text(result), NullWritable.get());
	}
}

4>Partitioner

public class ScoreMonthPartitioner extends Partitioner<Text, ScoreBean>{
	@Override
	public int getPartition(Text key, ScoreBean value, int numPartitions) {
		return value.getMonth()-1;
	}
}

5>Driver

public class ScoreDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Scorec_Job");
		job.setJarByClass(cn.tedu.score.ScoreDriver.class);
		job.setMapperClass(cn.tedu.score.ScoreMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(ScoreBean.class);
		job.setReducerClass(cn.tedu.score.ScoreReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		job.setPartitionerClass(ScoreMonthPartitioner.class);
		job.setNumReduceTasks(3);
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/scoredata"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/scoresult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

2、MapReduce其餘功能

一、InputFormat

1.概述

    InputFormat:輸入格式化器。

    MapReduce開始階段階段,InputFormat類用來產生InputSplit,並把基於RecordReader它切分紅record,造成Mapper的輸入。

    Hadoop自己提供了若干內置的InputFormat,其中若是不明確指定默認使用TextInputFormat。

2.常見的InputFormat

1>TextInputFormat

    做爲默認的文件輸入格式,用於讀取純文本文件,文件被分爲一系列以LF或者CR結束的行,key是每一行的位置偏移量,是LongWritable類型的,value是每一行的內容,爲Text類型。

2>KeyValueTextInputFormat

    一樣用於讀取文件,若是行被分隔符(缺省是tab)分割爲兩部分,第一部分爲key,剩下的部分爲value;若是沒有分隔符,整行做爲key,value爲空。

3>SequenceFileInputFormat

    用於讀取sequence file。sequence file是Hadoop用於存儲數據自定義格式的binary文件。它有兩個子類:SequenceFileAsBinaryInputFormat,將key和value以BytesWritable的類型讀出;SequenceFileAsTextInputFormat,將key和value以Text類型讀出。

4>SequenceFileInputFilter

    根據filter從sequence文件中取得部分知足條件的數據,經過setFilterClass指定Filter,內置了三種Filter,RegexFilter取key值知足指定的正則表達式的記錄;PercentFilter經過指定參數f,取記錄行數%f==0的記錄;MD5Filter經過指定參數f,取MD5(key)%f==0的記錄。

5>NLineInputFormat

    0.18.x版本新加入,能夠將文件以行爲單位進行split,好比文件的每一行對應一個mapper。獲得的key是每一行的位置偏移量(LongWritable類型),value是每一行的內容,Text類型。適用於行少列多的文件。

6>CompositeInputFormat

    用於多個數據源的join。

    能夠經過job.setInputFormatClass(XxxInputFormat.class);來設定選用哪一種InputFormat。

3.自定義InputFormat

    若是以上InputFormat不夠用,咱們也能夠本身定義InputFormat。

    全部InputFormat都要直接或間接的繼承InputFormat抽象類。

1>InputFormat

    InputFormat抽象類中主要定義了以下兩個方法:

getSplits(JobContext context)

    生產InputSplit集合的方法,此方法接受JobContext接受環境信息,獲得要處理的文件信息後,進行邏輯切割,產生InputSplit集合返回。

List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

createRecordReader(InputSplit split,TaskAttemptContext context)

    此方法返回RecordReader對象。一個RecordReader包含方法描述如何從InputSplit切分出要送入Mapper的K一、V1對。

public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;

2>FileInputFormat

    咱們能夠直接繼承InputFormat,但更多的時候咱們會選擇繼承他的一個實現子類,好比:FileInputFormat。此類是全部來源爲文件的InputFormat的基類,默認的TextInputFormat就繼承自它。

    FileInputFormat實現了InputFormat抽象類,實現了getSplits方法,根據配置去邏輯切割文件,返回FileSplit的集合,並提供了isSplitable()方法,子類能夠經過在這個方法中返回boolean類型的值代表是否要對文件進行邏輯切割,若是返回false則不管文件是否超過一個Block大小都不會進行切割,而將這個文件做爲一個邏輯塊返回。而對createRecordReader方法則沒有提供實現,設置爲了抽象方法,要求子類實現。

    若是想要更精細的改變邏輯切塊規則能夠覆蓋getSplits方法本身編寫代碼實現。

    而更多的時候,咱們直接使用父類中的方法而將精力放置在createRecordReader上,決定如何將InputSplit轉換爲一個個的Recoder。

    案例

    讀取score1.txt文件,從中每4行讀取成績,其中第一行爲姓名,後3行爲單科成績,計算總分,最終輸出爲姓名:總分格式的文件。

    文件內容樣例:

張三 
語文 97
數學 77
英語 69
李四 
語文 87
數學 57
英語 63
王五 
語文 47
數學 54
英語 39

    分析:

    在此例子中,須要按照每三行一次進行讀取稱爲一個InputSplit,通過比較可使用NLineInputFormat,可是爲了演示,使用自定義InputFormat來作這件事。

/**
 * 自定義InputFormat類,來實現每三行做爲一個Recorder觸發一次Mapper的效果
 */
public class MyFileInputFormat extends FileInputFormat<Text, Text> {

	/**
	 * 由於要每三行讀取做爲一個Recoder,因此若是切塊形成塊內數據行數不是3的倍數可能形成處理出問題
	 * 所以返回false,標識文件不要進行切塊
	 */
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	/**
	 * 返回自定義的MyRecordReader
	 */
	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		return new MyRecordReader();
	}

}
/**
 * 自定義的RecordReader,代表瞭如何將一個InputSplit切割出一個個的Recorder
 *
 */
public class MyRecordReader extends RecordReader<Text, Text> {
	private LineReader lineReader = null;
	private Text key = null;
	private Text value = null;
	private boolean hasMore = true;

	/**
	 * 初始化的方法,將在InputSplit被切割前調用
	 */
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		//獲取文件路徑
		FileSplit fs = (FileSplit) split;
		Path path = fs.getPath();
		//獲取文件系統
		Configuration conf = context.getConfiguration();
		FileSystem fileSystem = path.getFileSystem(conf);
		//從文件系統中讀取文件路徑獲得文件流
		FSDataInputStream fin = fileSystem.open(path);
		//將文件流包裝爲行讀取器
		lineReader = new LineReader(fin);
	}

	/**
	 * 下一個keyvalue方法
	 * 返回值表當前是否讀取到了新的紀錄
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		key = new Text();
		value = new Text();

		Text temp = new Text();
		int count = 0;
		for (int i = 0; i < 4; i++) {
			int len = lineReader.readLine(temp);
			if (len == 0) {
				hasMore = false;
				break;
			} else {
				count++;
				value.append(temp.getBytes(), 0, temp.getLength());
				value.append("\r\n".getBytes(), 0, "\r\n".length());
				temp.clear();
			}
		}
		key.set(count+"");

		return count != 0;

	}

	/**
	 * 獲取當前key方法
	 */
	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	/**
	 * 獲取當前value方法
	 */
	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	/**
	 * 獲取處理進度的方法,返回0.0f-1.0f之間的值表示進度
	 */
	@Override
	public float getProgress() throws IOException, InterruptedException {
		if (hasMore)
			return 0.0f;
		else
			return 1.0f;
	}

	/**
	 * 關閉資源的方法
	 * 當切割InputSplit結束會被調用用來釋放資源
	 */
	@Override
	public void close() throws IOException {
		lineReader.close();
	}

}
/**
 * 計算成績的Mapper
 *
 */
public class ScoreMapper extends Mapper<Text, Text, Text, Text> {
	
	public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
		String str = value.toString();
		String lines [] = str.split("\r\n");
		String name = lines[0];
		for(int i = 1;i<lines.length;i++){
			context.write(new Text(name), new Text(lines[i]));
		}
	}
}
/**
 * 計算成績的Reducer
 */
public class ScoreReducer extends Reducer<Text, Text, Text, IntWritable> {

	
	public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		String name = key.toString();
		int countFen = 0;
		for(Text v : values){
			String line = v.toString();
			String subject = line.split(" ")[0];
			int fen = Integer.parseInt(line.split(" ")[1]);
			countFen += fen;
		}
		
		context.write(new Text(name), new IntWritable(countFen));
	}

}
/**
 * ScoreDriver
 */
public class ScoreDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "score_job");
		job.setJarByClass(ScoreDriver.class);
		job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);
	
		//指定InputFormat爲自定義的MyFileInputFormat
		job.setInputFormatClass(MyFileInputFormat.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/score/score1.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/score/result"));
		
		if (!job.waitForCompletion(true))
			return;
	}
}

 

4.MultipleInputs

    MultipleInputs能夠將多個輸入組裝起來,同時爲Mapper提供數據,當咱們但願從多個來源讀取數據時可使用。甚至,在指定來源時能夠爲不一樣來源的數據指定不一樣的InputFormat和Mapper以應對不一樣格式的輸入數據。

    此類上提供的靜態方法有:

/**
 * 指定數據來源及對應的InputFormat
 */
MultipleInputs.addInputPath(job, path, inputFormatClass);

/**
 * 指定數據來源及對應的InputFormat 和 Mapper
 */
MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass);

 

    案例

    改造上述案例,同時從另外一個文件score2.txt中讀取數據統計成績。score2.txt中的數據是一行爲一個學生的成績。

    數據樣例:

趙六 56 47 69
陳七 73 84 91
劉八 45 56 66

    代碼:

/**
 * 計算成績的Mapper
 */
public class ScoreMapper2 extends Mapper<LongWritable, Text, Text, Text> {
	
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String attrs [] = line.split(" ");
		String name = attrs[0];
		for(int i = 1;i<attrs.length;i++){
			if(i == 1){
				context.write(new Text(name), new Text("語文\t"+attrs[i]));
			}else if(i == 2){
				context.write(new Text(name), new Text("數學\t"+attrs[i]));
			}else if(i == 3){
				context.write(new Text(name), new Text("英語\t"+attrs[i]));
			}
		}
	}
}
/**
 * ScoreDriver
 */
public class ScoreDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "score_job");
		job.setJarByClass(ScoreDriver.class);
		//job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);
	
		//指定InputFormat爲自定義的MyFileInputFormat
		job.setInputFormatClass(MyFileInputFormat.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		//FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/score/score1.txt"));
		MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/score/score1.txt"), MyFileInputFormat.class,ScoreMapper.class);
		MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/score/score2.txt"), TextInputFormat.class,ScoreMapper2.class);
		
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/score/result"));
		
		if (!job.waitForCompletion(true))
			return;
		
	}
}

 

二、OutputFormat

1.概述

    OutputFormat:輸出格式化器。

    MapReduce結束階段,OutputFormat類決定了Reducer如何產生輸出。

    Hadoop自己提供了若干內置的OutputFormat,其中若是不明確指定默認使用TextOutputFormat。

2.常見的OutputFormat

1>FileOutputFormat

    (實現OutputFormat接口)- 全部輸出到文件的OutputFormats的基類。

2>TextOutputFormat

    以行分隔、包含製表符定界的鍵值對的文本文件格式。

3>SequenceFileOutputFormat

    二進制鍵值數據的壓縮格式。

4>SequenceFileAsBinaryOutputFormat

    原生二進制數據的壓縮格式。

5>MapFileOutputFormat

    一種使用部分索引鍵的格式。

6>MultipleOutputFormat

    使用鍵值對參數寫入文件的抽象類。

7>MultipleTextOutputFormat

    輸出多個以標準行分割、製表符定界格式的文件。

8>MultipleSequenceFileOutputFormat

    輸出多個壓縮格式的文件。

    能夠經過job.setOutputFormatClass(XxxoutputFormat.class);來設定選用哪一種OutputFormat。

3.自定義OutputFormat

    若是以上OutputFormat不夠用,一樣也能夠本身定義OutputFormat。

1>OutputFormat

    全部的OutputFormat都要直接或間接的繼承OutputFormat抽象類

    OutputFormat抽象類中定義了以下的抽象方法:

getRecordWriter(TaskAttemptContext context)

public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;
public abstract void checkOutputSpecs(JobContext context ) throws IOException,InterruptedException;
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;

2>FileOutputFormat

    咱們能夠直接繼承OutputFormat,但更多的時候咱們會選擇繼承他的一個實現子類,好比FileOutputFormat。- 此類是全部目的地爲文件的OutputFormat的基類,例如默認的TextOutputFormat就繼承自它。

    FileOutputFormat實現了OutputFormat接口,默認實現了checkOutputSpecs和getOutputCommitter方法,並將getRecordWriter()設置爲抽象方法要求咱們去實現。

    若是想要更精細的改變邏輯能夠本身去編寫getOutputCommitter和checkOutputSpecs方法。

    而更多的時候,咱們直接使用父類中的方法而將精力放置在getRecordWriter上,決定如何產生輸出。

案例

    編寫wordcount案例,並將輸出按照'#'進行分割,輸出爲一行。

    數據樣例:

hello tom
hello joy
hello rose
hello joy
hello jerry
hello tom
hello rose
hello joy

    代碼以下:

public class MyRecordWriter<K,V> extends RecordWriter<K,V> {
	protected DataOutputStream out = null;
	private final byte[] keyValueSeparator;
	//public static final String NEW_LINE = System.getProperty("line.separator");
	public static final String NEW_LINE = "#";
	
	public MyRecordWriter(DataOutputStream out,String keyValueSeparator) {
		this.out = out;
		this.keyValueSeparator = keyValueSeparator.getBytes();
	}
	
	@Override
	public void write(K key, V value) throws IOException, InterruptedException {
		 if(key!=null){  
			 out.write(key.toString().getBytes());  
			 out.write(keyValueSeparator);  
		 }  
		 out.write(value.toString().getBytes());  
		 out.write(NEW_LINE.getBytes()); 
	}

	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		out.close();
	}
}
public class MyFileOutputFormat<K,V> extends FileOutputFormat<K,V> {
	@Override
	public RecordWriter<K,V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
		Configuration conf = context.getConfiguration();  
		Path file = getDefaultWorkFile(context, "");  
		FileSystem fs = file.getFileSystem(conf);  
		FSDataOutputStream fileOut = fs.create(file, false);  
		return new MyRecordWriter<K,V>(fileOut, " ");
	}
}
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String words [] = line.split(" ");
		for(String word : words){
			context.write(new Text(word), new LongWritable(1));
		}
	}
}
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
	public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
		Iterator<LongWritable> it = values.iterator();
		long count = 0;
		while(it.hasNext()){
			long c = it.next().get();
			count += c;
		}
		context.write(key, new LongWritable(count));
	}
}
public class WCDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "WCJob");
		
		job.setJarByClass(cn.tedu.wc.WCDriver.class);
		job.setMapperClass(cn.tedu.wc.WCMapper.class);
		job.setReducerClass(cn.tedu.wc.WCReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		job.setOutputFormatClass(MyFileOutputFormat.class);
		
		FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/park/words.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/park/result2"));

		if (!job.waitForCompletion(true))
			return;
	}

}

4.MultipleOutputs

    MultipleOutputs能夠令一個Reducer產生多個輸出文件。

    爲特定kv打上指定標記

MultipleOutputs<Text,LongWritable> mos = new MultipleOutputs<Text,LongWritable>(context);
mos.write("flag", key, value);
/**
爲指定標記內容指定輸出方式
能夠指定多個
*/
MultipleOutputs.addNamedOutput(job, "flag", XxxOutputFormat.class, Key.class, Value.class);

    案例

    改造上面的wordcount案例,將首字母爲a-j的輸出到"small"中。其餘輸出到"big"中。

    代碼以下:

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String words [] = line.split(" ");
		for(String word : words){
			context.write(new Text(word), new LongWritable(1));
		}
	}
}
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
	MultipleOutputs<Text,LongWritable> mos = null;
	@Override
	protected void setup(Reducer<Text, LongWritable, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		this.mos = new MultipleOutputs<Text,LongWritable>(context);
	}
	
	public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
		Iterator<LongWritable> it = values.iterator();
		long count = 0;
		while(it.hasNext()){
			long c = it.next().get();
			count += c;
		}
		if(key.toString().matches("[a-j][a-z]*")){
			mos.write("small", key, new LongWritable(count));
		}else{
			mos.write("big", key, new LongWritable(count));
		}
	}
}
public class WCDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "WCJob");
		
		job.setJarByClass(cn.tedu.wc.WCDriver.class);
		job.setMapperClass(cn.tedu.wc.WCMapper.class);
		job.setReducerClass(cn.tedu.wc.WCReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		MultipleOutputs.addNamedOutput(job, "small", MyFileOutputFormat.class, Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "big", MyFileOutputFormat.class, Text.class, IntWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.242.101:9000/park/words.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.242.101:9000/park/result2"));

		if (!job.waitForCompletion(true))
			return;
	}
}

三、GroupingComparator

    在reduce階段進行,對mapper發送過來的數據會進行分組的操做,這個過程稱爲爲Grouping。默認狀況下會將k2相同的內容做爲一組。

    咱們能夠經過job.setGroupingComparatorClass(MyGroupingComparator.class);方法本身指定Grouping規則。

1.WritableComparator

    案例

    改造WordCount案例,實現統計a-h 和 i-z開頭的單詞數量統計。

public class WCComparator extends WritableComparator {

	@Override
	public int compare(byte[] b1, int begin1, int len1, byte[] b2, int begin2, int len2) {
		try {
			DataInput in = new DataInputStream(new ByteArrayInputStream(b1,begin1,len1));
			Text ta = new Text();
			ta.readFields(in);
			
			in  = new DataInputStream(new ByteArrayInputStream(b2,begin2,len2));
			Text tb = new Text();
			tb.readFields(in);
			
			if(ta.toString().matches("^[a-n][a-z]*$") && tb.toString().matches("^[a-n][a-z]*$")){
				return 0;
			}else if(ta.toString().matches("^[o-z][a-z]*$") && tb.toString().matches("^[o-z][a-z]*$")){
				return 0;
			}else{
				return 1;
			}
		} catch (IOException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}

}

 

四、二次排序

    所謂二次排序就是首先按照第一字段排序,而後再對第一字段相同的行按照第二字段排序,注意不能破壞第一個字段的排序順序。

/**
 * 開發bean封裝數據
 */
 public class NumBean implements WritableComparable<NumBean> {
	private int lnum;
	private int rnum;
	
	public NumBean() {
	}
	
	public NumBean(int lnum, int rnum) {
		this.lnum = lnum;
		this.rnum = rnum;
	}

	public int getLnum() {
		return lnum;
	}
	public void setLnum(int lnum) {
		this.lnum = lnum;
	}
	public int getRnum() {
		return rnum;
	}
	public void setRnum(int rnum) {
		this.rnum = rnum;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.lnum = in.readInt();
		this.rnum = in.readInt();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(lnum);
		out.writeInt(rnum);
	}

	@Override
	public int compareTo(NumBean o) {
		if(this.lnum != o.getLnum()){
			return this.lnum - o.getLnum();
		}else{
			return this.rnum - o.getRnum();
		}
	}
	
}
/**
 * 開發Mapper 用 NumBean做爲k2 因爲 NumBean覆蓋了compareTo方法 能夠實現自動二次排序 
 */
public class NumMapper extends Mapper<LongWritable, Text, NumBean, NullWritable> {
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NumBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		//1.讀取行
		String line = value.toString();
		//2.切分出lnum 和  rnum
		String []  attrs = line.split(" ");
		int lnum = Integer.parseInt(attrs[0]);
		int rnum = Integer.parseInt(attrs[1]);
		//3.封裝數據到bean
		NumBean nb = new NumBean(lnum,rnum);
		//4.發送數據
		context.write(nb, NullWritable.get());
	}
}
/**
 * 開發Reducer輸出結果 shuffle階段已經完成了二次排序 此處直接輸出便可
 */
public class NumReducer extends Reducer<NumBean, NullWritable, IntWritable, IntWritable> {
	@Override
	protected void reduce(NumBean key, Iterable<NullWritable> values,
			Reducer<NumBean, NullWritable, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {
		int lnum = key.getLnum();
		int rnum = key.getRnum();
		context.write(new IntWritable(lnum), new IntWritable(rnum));
	}
}
/**
 * 爲了防止重複 數據被grouping成一條數據 形成結果丟失 自定義gourping過程 固定返回-1 表示不管什麼狀況都不合並數據
 */
public class NumWritableComparator extends WritableComparator {
	@Override
	public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
		return -1;
	}
}
/**
 * 開發Driver組裝程序
	 */
public class NumDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "num_job");
		job.setJarByClass(cn.tedu.mr.grouping.num.NumDriver.class);
		job.setMapperClass(cn.tedu.mr.grouping.num.NumMapper.class);
		job.setMapOutputKeyClass(NumBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setReducerClass(cn.tedu.mr.grouping.num.NumReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		job.setGroupingComparatorClass(NumWritableComparator.class);
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/ndata"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/nresult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

五、數據傾斜處理

    在開發MR程序時,可能遇到的數據分配不一致,形成程序性能降低的問題,這個問題稱之爲數據傾斜問題。

    解決辦法:

    若是是由於shuffle分配數據不均勻形成數據傾斜,重寫parition均勻分配數據便可。

    若是是數據自己帶有傾斜的特色沒法經過修改parition來解決傾斜問題,那麼能夠經過如下幾個方法嘗試解決:

    1.利用combiner減輕傾斜的狀況。

    2.將形成傾斜的數據拿出單獨處理。

    3.將一個mr拆分紅多個mr下降傾斜形成的危害。

六、小文件處理

1.小文件在hadoop中會有什麼問題?

    每一個小文件不管多小都會對應一個block,而每個block在NameNode中都要有元數據的記錄,若是存在大量小文件,則NameNode中的大量空間都用來存放這些小文件的元數據信息,實際上是至關浪費的,對於NameNode的性能有比較大的影響。

    當使用mapreduce處理大量小文件時,默認狀況下mapreduce在進行切片操做時規則是和block切的規則同樣,即一個block一個InputSplit,而一個InputSplit就對應一個Mapper,這樣會形成開啓大量的MapperTask,可是每一個MapperTask處理的數據量都頗有限。極端狀況下開啓大量Mapper耗費內存甚至可能形成程序的崩潰。

2.Hadoop Archive

    Hadoop Archive或者HAR,是一個高效地將小文件放入HDFS塊中的文件存檔工具,它可以將多個小文件打包成一個HAR文件,這樣在減小namenode內存使用的同時,仍然容許對文件進行透明的訪問。

    HAR是在Hadoop file system之上的一個文件系統,所以全部fs shell命令對HAR文件都可用,只不過是文件路徑格式不同,HAR的訪問路徑能夠是如下兩種格式:

    har://scheme-hostname:port/archivepath/fileinarchive

    har:///archivepath/fileinarchive(本節點)

1>使用HAR時須要兩點

    第一,對小文件進行存檔後,原文件並不會自動被刪除,須要用戶本身刪除;

    第二,建立HAR文件的過程其實是在運行一個mapreduce做業,於是須要有一個hadoop集羣運行此命令。

2>HAR還有一些缺陷

    第一,一旦建立,Archives便不可改變。要增長或移除裏面的文件,必須從新建立歸檔文件。

    第二,要歸檔的文件名中不能有空格,不然會拋出異常,能夠將空格用其餘符號替換(使用-Dhar.space.replacement.enable=true 和-Dhar.space.replacement參數)。

3>命令

hadoop archive -archiveName <NAME>.har -p <parent path>[-r <replication factor>]<src>* <dest>

    案例:將hdfs:///small中的內容壓縮成small.har

    將某個文件打成har:

hadoop archive -archiveName small.har -p /small/small1.txt /small

    將多個small開頭的文件打成har:

hadoop archive -archiveName small.har -p /small/small* /small

    將某個文件夾下全部文件打成har:

hadoop archive -archiveName small.har -p /small /small

    查看HAR文件存檔中的文件

hadoop fs -ls har:///small/small.har

    輸出har文件內容到本地系統

hadoop fs -get har:///small/small.har/smallx

3.SequenceFile

    SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。目前,也有很多人在該文件的基礎之上提出了一些HDFS中小文件存儲的解決方案,他們的基本思路就是將小文件進行合併成一個大文件,同時對這些小文件的位置信息構建索引。

    文件不支持複寫操做,不能向已存在的SequenceFile(MapFile)追加存儲記錄。

    當write流不關閉的時候,沒有辦法構造read流。也就是在執行文件寫操做的時候,該文件是不可讀取的。

@Test
/**
 * SequenceFile 寫操做
 */
public void SequenceWriter() throws Exception{
	final String INPUT_PATH= "hdfs://192.168.242.101:9000/big";
	final String OUTPUT_PATH= "hdfs://192.168.242.101:9000/big2";
	
	//獲取文件系統
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000");
	FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
	
	//建立seq的輸出流
	Text key = new Text();
	Text value = new Text();
	SequenceFile.Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path(OUTPUT_PATH), key.getClass(), value.getClass());

	//寫新的數據
	System.out.println(writer.getLength());
	key.set("small4.txt".getBytes());
	value.set("ddddddd".getBytes());
	writer.append(key, value);
	
	//關閉流
	IOUtils.closeStream(writer);
}

@Test
/**
 * SequenceFile 讀操做
 */
public void sequenceRead() throws Exception {
	final String INPUT_PATH= "hdfs://192.168.242.101:9000/big/big.seq";
	
	//獲取文件系統
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000");
	FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
	
	//準備讀取seq的流
	Path path = new Path(INPUT_PATH);
	SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path, conf);
	//經過seq流得到key和value準備承載數據
	Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
	Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
	//循環從流中讀取key和value
	long position = reader.getPosition();
	while(reader.next(key, value)){
		//打印當前key value
		System.out.println(key+":"+value);
		//移動遊標指向下一個key value
		position=reader.getPosition();
	}
	//關閉流
	IOUtils.closeStream(reader);
}

@Test
/**
 * 多個小文件合併成大seq文件
 * @throws Exception
 */
public void small2Big() throws Exception{
	final String INPUT_PATH= "hdfs://192.168.242.101:9000/small";
	final String OUTPUT_PATH= "hdfs://192.168.242.101:9000/big/big.seq";
	//獲取文件系統
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000");
	FileSystem fs = FileSystem.get(conf);
	//經過文件系統獲取全部要處理的文件
	FileStatus[] files = fs.listStatus(new Path(INPUT_PATH));
	//建立能夠輸出seq文件的輸出流
	Text key = new Text();
	Text value = new Text();
	SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(OUTPUT_PATH), key.getClass(),value.getClass());
	//循環處理每一個文件
	for (int i = 0; i < files.length; i++) {
		//key設置爲文件名
		key.set(files[i].getPath().getName());
		//讀取文件內容
		InputStream in = fs.open(files[i].getPath());
		byte[] buffer = new byte[(int) files[i].getLen()];
		IOUtils.readFully(in, buffer, 0, buffer.length);
		//值設置爲文件內容
		value.set(buffer);
		//關閉輸入流
		IOUtils.closeStream(in);
		//將key文件名value文件內容寫入seq流中
		writer.append(key, value);
	}
	//關閉seq流
	IOUtils.closeStream(writer);

}

4.CompositeInputFormat

    用於多個數據源的join。

    此類能夠解決多個小文件在進行mr操做時map建立過多的問題。

    此類的原理在於,它本質上市一個InputFormat,在其中的getSplits方法中,將他能讀到的全部的文件生成一個InputSplit

    使用此類須要配合自定義的RecordReader,須要本身開發一個RecordReader指定如何從InputSplit中讀取數據。

    也能夠經過參數控制最大的InputSplit大小。

CombineTextInputFormat.setMaxInputSplitSize(job, 256*1024*1024);

 

上一篇:Hadoop-MapReduce基本原理及相關操做

下一篇:Hadoop源碼導入Eclipse及問題解決

相關文章
相關標籤/搜索