當每次map執行以後都有大量中間結果輸出,而後又reduce去進行合併計算的過程當中,都會用把中間數據從map所在的節點傳輸到reduce所在的節點進行計算的過程,這個過程當中就會有大量的IO讀寫和網絡傳輸,從而下降計算的效率。java
這種狀況下就須要咱們對map輸出的中間結果,在本地先進行必要的合併計算減少中間結果的數據量以後再傳輸到reduce上面進行最終計算。apache
從功能上面講combiner其實就是一個reduce,只是計算的位置和數據不同,combiner是在數據所在的map節點上面計算的,且計算的數據只是當前map所輸出的中間結果。編程
combiner是實現也是繼承自Reducer。且在使用時對job進行設置網絡
// 設置combiner類 job.setCombinerClass(AverageCombiner.class);
tmpIn.txtapp
014399999999999/1992-01-31/10 014399999999999/1992-02-28/11 014399999999999/1992-03-31/14 014399999999999/1992-04-30/16 014399999999999/1992-05-51/30 014399999999999/1992-06-30/33 014399999999999/1992-07-31/35 014399999999999/1993-01-31/10 014399999999999/1993-02-28/14 014399999999999/1993-03-31/13 014399999999999/1993-04-30/25 014399999999999/1993-05-31/30 014399999999999/1993-06-30/36 014399999999999/1993-07-31/38 014399999999999/1994-01-31/10 014399999999999/1994-02-28/14 014399999999999/1994-03-31/13 014399999999999/1994-04-30/25 014399999999999/1994-05-31/30 014399999999999/1994-06-30/36
計算每一年的平均溫度,若是這個文件在多個map上面計算,咱們能夠先對每一個map上面的數據進行計算,求出每一個map上面每一年的平均溫度,而後再計算reduce對全部數據計算每一年平均溫度。ide
平均對象oop
package com.jf.obj; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.io.Writable; /** * 用於存放平均值 和數量 * * @author Administrator * */ public class AverageValue implements Writable { // 計算平均值的數量 private VIntWritable num; // 平均值 private DoubleWritable avgValue; public AverageValue() { num = new VIntWritable(); avgValue = new DoubleWritable(); } public void write(DataOutput out) throws IOException { num.write(out); avgValue.write(out); } public void readFields(DataInput in) throws IOException { num.readFields(in); avgValue.readFields(in); } public VIntWritable getNum() { return num; } public void setNum(VIntWritable num) { this.num = num; } public DoubleWritable getAvgValue() { return avgValue; } public void setAvgValue(DoubleWritable avgValue) { this.avgValue = avgValue; } }
實現計算this
package com.jf.combiner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.jf.combiner.AverageTempCombiner.AverageCombiner.AverageReducer; import com.jf.obj.AverageValue; public class AverageTempCombiner extends Configured implements Tool { // map類 static class AverageMapper extends Mapper<LongWritable, Text, Text, AverageValue> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, AverageValue>.Context context) throws IOException, InterruptedException { // 每行數據格式:014399999999999/1992-01-31/10 String line = value.toString(); if (line != null && line.length() > 0) { String[] strs = line.split("/"); if (strs.length == 3) { String time = strs[1]; // 得到年份 String year = time.substring(0, time.indexOf("-")); Double temp = Double.parseDouble(strs[2]); // 構建平均對象 AverageValue averageValue = new AverageValue(); averageValue.setNum(new VIntWritable(1)); averageValue.setAvgValue(new DoubleWritable(temp)); // 將年份和溫度平均對象寫出 context.write(new Text(year), averageValue); } } } } // combiner,combiner本質上就是一個reducer因此繼承自Reducer,只是combiner能夠在map端首先進行初步彙總計算 // combiner 彙總的只是本數據節點的map結果數據 static class AverageCombiner extends Reducer<Text, AverageValue, Text, AverageValue> { @Override protected void reduce(Text key, Iterable<AverageValue> values, Reducer<Text, AverageValue, Text, AverageValue>.Context context) throws IOException, InterruptedException { int num = 0; double sumValue = 0; for (AverageValue averageValue : values) { num += averageValue.getNum().get(); sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get(); } // 構建combiner以後的AverageValue對象,做爲reducer的輸入 AverageValue avgValue = new AverageValue(); avgValue.setNum(new VIntWritable(num)); avgValue.setAvgValue(new DoubleWritable(sumValue / num)); context.write(key, avgValue); } // 進行reducer計算 static class AverageReducer extends Reducer<Text, AverageValue, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<AverageValue> values, Reducer<Text, AverageValue, Text, DoubleWritable>.Context context) throws IOException, InterruptedException { int num = 0; double sumValue = 0; for (AverageValue averageValue : values) { num += averageValue.getNum().get(); sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get(); } context.write(key, new DoubleWritable(sumValue / num)); } } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(AverageMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(AverageValue.class); job.setReducerClass(AverageReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); // 設置combiner類 job.setCombinerClass(AverageCombiner.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new AverageTempCombiner(), args)); } }
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.combiner.AverageTempCombiner -Dinput=input/tempIn -Doutput=output/tempCombiner
同實例一數據。spa
本次計算的是每個氣象站每一年的平均溫度,因此咱們要使用年份和睦象站同時做業聯合的key(聯合鍵)。日誌
咱們就寫一個YeayStation,對於YearStation既要序列化又要可比較大小要實現WritableComparable<T>。
咱們須要使用hash值是由於在數據分區的時候,也就是肯定哪一個數據進入哪一個reduce的時候。須要經過hashCode和reduce個數取餘的結果肯定進入哪一個reduce。(IntWritable的默認hash值是它表明int類型數字的自己)因此說數據分區主要是用的HashCode(key的值得hashCode)。
須要比較大小是由於進入同一個reduce的多組數據誰先進入,要比較它key值得大小。誰小誰先進入。
若是咱們不去重寫HashCode的話,咱們使用的是Object的hashCode()方法。當咱們一個YearStation對象重複去使用的時候,全部的hashCode都同樣。因此咱們仍是儘量的去重寫hashCode和equals方法。咱們須要year和stationId同時參與分區,那咱們重寫的hashcode同時和這兩個參數有關係。
注意:在這個需求中,咱們須要重寫toString()方法,由於咱們這個鍵最後要輸出到HDFS中的結果文件中去的。若是不重寫多是一個YearStation的地址。那麼reduce輸出的key和value以什麼分割的?其實就是製表符("\t")。
聯合key
package com.jf.obj; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.io.Writable; /** * 用於存放平均值 和數量 * * @author Administrator * */ public class AverageValue implements Writable { // 計算平均值的數量 private VIntWritable num; // 平均值 private DoubleWritable avgValue; public AverageValue() { num = new VIntWritable(); avgValue = new DoubleWritable(); } public void write(DataOutput out) throws IOException { num.write(out); avgValue.write(out); } public void readFields(DataInput in) throws IOException { num.readFields(in); avgValue.readFields(in); } public VIntWritable getNum() { return num; } public void setNum(VIntWritable num) { this.num = num; } public DoubleWritable getAvgValue() { return avgValue; } public void setAvgValue(DoubleWritable avgValue) { this.avgValue = avgValue; } }
combiner編程
package com.jf.combiner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.jf.obj.AverageValue; import com.jf.obj.YearStation; public class AvgByYearStationCombiner extends Configured implements Tool { static class AvgMapper extends Mapper<LongWritable, Text, YearStation, AverageValue> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, YearStation, AverageValue>.Context context) throws IOException, InterruptedException { String line = value.toString(); if (line != null && line.length() > 0) { String[] strs = line.split("/"); if (strs.length == 3) { String time = strs[1]; String year = time.substring(0, time.indexOf("-")); // 年份和站點組成的聯合key YearStation yearStation = new YearStation(); yearStation.setYear(new Text(year)); yearStation.setStation(new Text(strs[0])); // 平均對象 AverageValue averageValue = new AverageValue(); averageValue.setNum(new VIntWritable(1)); averageValue.setAvgValue(new DoubleWritable(Double.parseDouble(strs[2]))); System.out.println("combiner:" + yearStation + "==" + averageValue.getAvgValue().get() + "X" + averageValue.getNum().get()); context.write(yearStation, averageValue); } } } } static class AvgCombiner extends Reducer<YearStation, AverageValue, YearStation, AverageValue> { @Override protected void reduce(YearStation key, Iterable<AverageValue> values, Reducer<YearStation, AverageValue, YearStation, AverageValue>.Context context) throws IOException, InterruptedException { int num = 0; double sumValue = 0; for (AverageValue averageValue : values) { num += averageValue.getNum().get(); sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get(); } // 建立中間平均對象 AverageValue avgValue = new AverageValue(); avgValue.setNum(new VIntWritable(num)); avgValue.setAvgValue(new DoubleWritable(sumValue / num)); System.out.println("combiner:" + key + "==" + avgValue.getAvgValue().get() + "X" + avgValue.getNum().get()); context.write(key, avgValue); } } static class AvgReducer extends Reducer<YearStation, AverageValue, YearStation, DoubleWritable> { @Override protected void reduce(YearStation key, Iterable<AverageValue> values, Reducer<YearStation, AverageValue, YearStation, DoubleWritable>.Context context) throws IOException, InterruptedException { int num = 0; double sumValue = 0; for (AverageValue averageValue : values) { num += averageValue.getNum().get(); sumValue += averageValue.getAvgValue().get() * averageValue.getNum().get(); } System.out.println("combiner:" + key + "==" + sumValue / num); context.write(key, new DoubleWritable(sumValue / num)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(AvgMapper.class); job.setMapOutputKeyClass(YearStation.class); job.setMapOutputValueClass(AverageValue.class); job.setCombinerClass(AvgCombiner.class); job.setReducerClass(AvgReducer.class); job.setOutputKeyClass(YearStation.class); job.setOutputValueClass(DoubleWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new AvgByYearStationCombiner(), args)); } }
執行結果
日誌中間處理