hadoop筆記八:Combiner優化MapReduce

1.介紹

當每次map執行以後都有大量中間結果輸出,而後又reduce去進行合併計算的過程當中,都會用把中間數據從map所在的節點傳輸到reduce所在的節點進行計算的過程,這個過程當中就會有大量的IO讀寫和網絡傳輸,從而下降計算的效率。java

這種狀況下就須要咱們對map輸出的中間結果,在本地先進行必要的合併計算減少中間結果的數據量以後再傳輸到reduce上面進行最終計算。apache

2.使用

從功能上面講combiner其實就是一個reduce,只是計算的位置和數據不同,combiner是在數據所在的map節點上面計算的,且計算的數據只是當前map所輸出的中間結果。編程

combiner是實現也是繼承自Reducer。且在使用時對job進行設置網絡

// 設置combiner類
job.setCombinerClass(AverageCombiner.class);

3.實例一

1)數據

tmpIn.txtapp

014399999999999/1992-01-31/10
014399999999999/1992-02-28/11
014399999999999/1992-03-31/14
014399999999999/1992-04-30/16
014399999999999/1992-05-51/30
014399999999999/1992-06-30/33
014399999999999/1992-07-31/35
014399999999999/1993-01-31/10
014399999999999/1993-02-28/14
014399999999999/1993-03-31/13
014399999999999/1993-04-30/25
014399999999999/1993-05-31/30
014399999999999/1993-06-30/36
014399999999999/1993-07-31/38
014399999999999/1994-01-31/10
014399999999999/1994-02-28/14
014399999999999/1994-03-31/13
014399999999999/1994-04-30/25
014399999999999/1994-05-31/30
014399999999999/1994-06-30/36

2)分析

計算每一年的平均溫度,若是這個文件在多個map上面計算,咱們能夠先對每一個map上面的數據進行計算,求出每一個map上面每一年的平均溫度,而後再計算reduce對全部數據計算每一年平均溫度。ide

3)編程

平均對象oop

package com.jf.obj;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.Writable;

/**
 * 用於存放平均值 和數量
 * 
 * @author Administrator
 *
 */
public class AverageValue implements Writable {

	// 計算平均值的數量
	private VIntWritable num;
	// 平均值
	private DoubleWritable avgValue;

	public AverageValue() {
		num = new VIntWritable();
		avgValue = new DoubleWritable();
	}

	public void write(DataOutput out) throws IOException {
		num.write(out);
		avgValue.write(out);
	}

	public void readFields(DataInput in) throws IOException {
		num.readFields(in);
		avgValue.readFields(in);
	}

	public VIntWritable getNum() {
		return num;
	}

	public void setNum(VIntWritable num) {
		this.num = num;
	}

	public DoubleWritable getAvgValue() {
		return avgValue;
	}

	public void setAvgValue(DoubleWritable avgValue) {
		this.avgValue = avgValue;
	}

}

實現計算this

package com.jf.combiner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.jf.combiner.AverageTempCombiner.AverageCombiner.AverageReducer;
import com.jf.obj.AverageValue;

public class AverageTempCombiner extends Configured implements Tool {

	// map類
	static class AverageMapper extends Mapper<LongWritable, Text, Text, AverageValue> {

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, AverageValue>.Context context)
				throws IOException, InterruptedException {
			// 每行數據格式:014399999999999/1992-01-31/10
			String line = value.toString();
			if (line != null && line.length() > 0) {
				String[] strs = line.split("/");
				if (strs.length == 3) {
					String time = strs[1];
					// 得到年份
					String year = time.substring(0, time.indexOf("-"));

					Double temp = Double.parseDouble(strs[2]);
					// 構建平均對象
					AverageValue averageValue = new AverageValue();
					averageValue.setNum(new VIntWritable(1));
					averageValue.setAvgValue(new DoubleWritable(temp));
					// 將年份和溫度平均對象寫出
					context.write(new Text(year), averageValue);
				}
			}
		}
	}

	// combiner,combiner本質上就是一個reducer因此繼承自Reducer,只是combiner能夠在map端首先進行初步彙總計算
	// combiner 彙總的只是本數據節點的map結果數據
	static class AverageCombiner extends Reducer<Text, AverageValue, Text, AverageValue> {
		@Override
		protected void reduce(Text key, Iterable<AverageValue> values,
				Reducer<Text, AverageValue, Text, AverageValue>.Context context)
				throws IOException, InterruptedException {
			int num = 0;
			double sumValue = 0;
			for (AverageValue averageValue : values) {
				num += averageValue.getNum().get();
				sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get();
			}
			// 構建combiner以後的AverageValue對象,做爲reducer的輸入
			AverageValue avgValue = new AverageValue();
			avgValue.setNum(new VIntWritable(num));
			avgValue.setAvgValue(new DoubleWritable(sumValue / num));
			context.write(key, avgValue);
		}

		// 進行reducer計算
		static class AverageReducer extends Reducer<Text, AverageValue, Text, DoubleWritable> {
			@Override
			protected void reduce(Text key, Iterable<AverageValue> values,
					Reducer<Text, AverageValue, Text, DoubleWritable>.Context context)
					throws IOException, InterruptedException {
				int num = 0;
				double sumValue = 0;
				for (AverageValue averageValue : values) {
					num += averageValue.getNum().get();
					sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get();
				}
				context.write(key, new DoubleWritable(sumValue / num));
			}
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(AverageMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(AverageValue.class);

		job.setReducerClass(AverageReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);

		// 設置combiner類
		job.setCombinerClass(AverageCombiner.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new AverageTempCombiner(), args));
	}

}

4)結果

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.combiner.AverageTempCombiner -Dinput=input/tempIn -Doutput=output/tempCombiner

4.實例二

1)數據

同實例一數據。spa

2)分析

本次計算的是每個氣象站每一年的平均溫度,因此咱們要使用年份和睦象站同時做業聯合的key(聯合鍵)。日誌

咱們就寫一個YeayStation,對於YearStation既要序列化又要可比較大小要實現WritableComparable<T>。

咱們須要使用hash值是由於在數據分區的時候,也就是肯定哪一個數據進入哪一個reduce的時候。須要經過hashCode和reduce個數取餘的結果肯定進入哪一個reduce。(IntWritable的默認hash值是它表明int類型數字的自己)因此說數據分區主要是用的HashCode(key的值得hashCode)。

須要比較大小是由於進入同一個reduce的多組數據誰先進入,要比較它key值得大小。誰小誰先進入

若是咱們不去重寫HashCode的話,咱們使用的是Object的hashCode()方法。當咱們一個YearStation對象重複去使用的時候,全部的hashCode都同樣。因此咱們仍是儘量的去重寫hashCode和equals方法。咱們須要year和stationId同時參與分區,那咱們重寫的hashcode同時和這兩個參數有關係。

注意:在這個需求中,咱們須要重寫toString()方法,由於咱們這個鍵最後要輸出到HDFS中的結果文件中去的。若是不重寫多是一個YearStation的地址。那麼reduce輸出的key和value以什麼分割的?其實就是製表符("\t")。

3)編程

聯合key

package com.jf.obj;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.Writable;

/**
 * 用於存放平均值 和數量
 * 
 * @author Administrator
 *
 */
public class AverageValue implements Writable {

	// 計算平均值的數量
	private VIntWritable num;
	// 平均值
	private DoubleWritable avgValue;

	public AverageValue() {
		num = new VIntWritable();
		avgValue = new DoubleWritable();
	}

	public void write(DataOutput out) throws IOException {
		num.write(out);
		avgValue.write(out);
	}

	public void readFields(DataInput in) throws IOException {
		num.readFields(in);
		avgValue.readFields(in);
	}

	public VIntWritable getNum() {
		return num;
	}

	public void setNum(VIntWritable num) {
		this.num = num;
	}

	public DoubleWritable getAvgValue() {
		return avgValue;
	}

	public void setAvgValue(DoubleWritable avgValue) {
		this.avgValue = avgValue;
	}

}

combiner編程

package com.jf.combiner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.jf.obj.AverageValue;
import com.jf.obj.YearStation;

public class AvgByYearStationCombiner extends Configured implements Tool {

	static class AvgMapper extends Mapper<LongWritable, Text, YearStation, AverageValue> {
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, YearStation, AverageValue>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			if (line != null && line.length() > 0) {
				String[] strs = line.split("/");
				if (strs.length == 3) {
					String time = strs[1];
					String year = time.substring(0, time.indexOf("-"));
					// 年份和站點組成的聯合key
					YearStation yearStation = new YearStation();
					yearStation.setYear(new Text(year));
					yearStation.setStation(new Text(strs[0]));
					// 平均對象
					AverageValue averageValue = new AverageValue();
					averageValue.setNum(new VIntWritable(1));
					averageValue.setAvgValue(new DoubleWritable(Double.parseDouble(strs[2])));
					System.out.println("combiner:" + yearStation + "==" + averageValue.getAvgValue().get() + "X"
							+ averageValue.getNum().get());
					context.write(yearStation, averageValue);
				}
			}
		}
	}

	static class AvgCombiner extends Reducer<YearStation, AverageValue, YearStation, AverageValue> {
		@Override
		protected void reduce(YearStation key, Iterable<AverageValue> values,
				Reducer<YearStation, AverageValue, YearStation, AverageValue>.Context context)
				throws IOException, InterruptedException {
			int num = 0;
			double sumValue = 0;
			for (AverageValue averageValue : values) {
				num += averageValue.getNum().get();
				sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get();
			}
			// 建立中間平均對象
			AverageValue avgValue = new AverageValue();
			avgValue.setNum(new VIntWritable(num));
			avgValue.setAvgValue(new DoubleWritable(sumValue / num));
			System.out.println("combiner:" + key + "==" + avgValue.getAvgValue().get() + "X" + avgValue.getNum().get());
			context.write(key, avgValue);
		}
	}

	static class AvgReducer extends Reducer<YearStation, AverageValue, YearStation, DoubleWritable> {
		@Override
		protected void reduce(YearStation key, Iterable<AverageValue> values,
				Reducer<YearStation, AverageValue, YearStation, DoubleWritable>.Context context)
				throws IOException, InterruptedException {
			int num = 0;
			double sumValue = 0;
			for (AverageValue averageValue : values) {
				num += averageValue.getNum().get();
				sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get();
			}
			System.out.println("combiner:" + key + "==" + sumValue / num);
			context.write(key, new DoubleWritable(sumValue / num));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(AvgMapper.class);
		job.setMapOutputKeyClass(YearStation.class);
		job.setMapOutputValueClass(AverageValue.class);

		job.setCombinerClass(AvgCombiner.class);

		job.setReducerClass(AvgReducer.class);
		job.setOutputKeyClass(YearStation.class);
		job.setOutputValueClass(DoubleWritable.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new AvgByYearStationCombiner(), args));
	}

}

4)結果

執行結果

日誌中間處理

相關文章
相關標籤/搜索