示例代碼java
package com.vip09; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class ScoreWritable implements WritableComparable<Object>{ //在自定義的數據類型中,建議使用java原生的數據類型 private float chinese ; private float math ; private float english ; private float physics ; private float chemistry ; //在自定義的數據類型中,必需要有一個無參的構造方法 public ScoreWritable(){} public ScoreWritable(float chinese, float math, float english, float physics, float chemistry) { this.chinese = chinese; this.math = math; this.english = english; this.physics = physics; this.chemistry = chemistry; } public void set(float chinese, float math, float english, float physics, float chemistry){ this.chinese = chinese; this.math = math; this.english = english; this.physics = physics; this.chemistry = chemistry; } public float getChinese() { return chinese; } public float getMath() { return math; } public float getEnglish() { return english; } public float getPhysics() { return physics; } public float getChemistry() { return chemistry; } //是在寫入數據的時候調用,進行序列化 @Override public void write(DataOutput out) throws IOException { out.writeFloat(chinese); out.writeFloat(math); out.writeFloat(english); out.writeFloat(physics); out.writeFloat(chemistry); } //該方法是在取出數據時調用,反序列化,以便生成對象 @Override public void readFields(DataInput in) throws IOException { chinese = in.readFloat() ; math = in.readFloat() ; english = in.readFloat() ; physics = in.readFloat() ; chemistry = in.readFloat() ; } @Override public int compareTo(Object o) { // TODO Auto-generated method stub return 0; } }
package com.vip09; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ScoreCount extends Configured implements Tool{ //map和reduce public static class ScoreMapper extends Mapper<Text, ScoreWritable, Text, ScoreWritable>{ @Override protected void map(Text key, ScoreWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class ScoreReducer extends Reducer<Text, ScoreWritable, Text, Text>{ private Text text = new Text() ; @Override protected void reduce(Text key, Iterable<ScoreWritable> value, Context context) throws IOException, InterruptedException { float totalScore = 0.0f ; float avgScore = 0.0f ; for (ScoreWritable sw : value) { totalScore = sw.getChinese() + sw.getEnglish() + sw.getMath() + sw.getPhysics() + sw.getChemistry() ; avgScore = totalScore/5 ; } text.set(totalScore + "\t" + avgScore); context.write(key, text); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration() ; //刪除已經存在的輸出目錄 Path mypath = new Path(args[1]) ; FileSystem hdfs = mypath.getFileSystem(conf); if(hdfs.isDirectory(mypath)){ hdfs.delete(mypath, true) ; } Job job = Job.getInstance(conf, "scorecount") ; job.setJarByClass(ScoreCount.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReducer.class); //若是是自定義的類型,須要進行設置 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ScoreWritable.class); //設置自定義的輸入格式 job.setInputFormatClass(ScoreInputFormat.class); job.waitForCompletion(true) ; return 0; } public static void main(String[] args) throws Exception { String[] args0 = {"hdfs://192.168.153.111:9000/input5", "hdfs://192.168.153.111:9000/output15"} ; int res = ToolRunner.run(new ScoreCount(), args0) ; System.exit(res); } }
package com.vip09; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class ScoreInputFormat extends FileInputFormat<Text, ScoreWritable> { //須要注意的是: /* * 對於一個數據輸入格式,都須要一個對應的RecordReader * 重寫createRecordReader()方法,其實也就是重寫其返回的對象 * 這裏就是自定義的ScoreRecordReader類,該類須要繼承RecordReader,實現數據的讀取 * */ @Override public RecordReader<Text, ScoreWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new ScoreRecordReader(); } }
package com.vip09; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class ScoreRecordReader extends RecordReader<Text, ScoreWritable>{ public LineReader in ; //行讀取器 public Text lineKey ; //自定義key類型 public ScoreWritable linevalue ; //自定義的value類型 public Text line ; //行數據 //初始化方法,只執行一次 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fsplit = (FileSplit)split ; Configuration conf = context.getConfiguration(); Path file = fsplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream filein = fs.open(file); in = new LineReader(filein, conf) ; line = new Text() ; lineKey = new Text() ; linevalue = new ScoreWritable() ; } //讀取每一行數據的時候,都會執行該方法 //咱們只須要根據本身的需求,重點編寫該方法便可,其餘的方法比較固定,仿照就好 @Override public boolean nextKeyValue() throws IOException, InterruptedException { int linesize = in.readLine(line); if(linesize == 0){ return false ; } String[] pieces = line.toString().split("\\s+") ; if(pieces.length != 7){ throw new IOException("無效的數據") ; } //將學生的每門成績轉換爲float類型 float a =0 , b= 0 , c = 0 ,d = 0, e =0 ; try{ a = Float.parseFloat(pieces[2].trim()) ; b = Float.parseFloat(pieces[3].trim()) ; c = Float.parseFloat(pieces[4].trim()) ; d = Float.parseFloat(pieces[5].trim()) ; e = Float.parseFloat(pieces[6].trim()) ; }catch(NumberFormatException nfe){ nfe.printStackTrace(); } lineKey.set(pieces[0] + "\t" + pieces[1]); //完成自定義的key數據 linevalue.set(a, b, c, d, e); //封裝自定義的value數據 return true; } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineKey; } @Override public ScoreWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return linevalue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { if(in != null){ in.close(); } } }
package com.vip09; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceCaseEmail extends Configured implements Tool{ public static class EmailMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1) ; @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { context.write(value, one); } } public static class EmailReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable() ; //輸出到多個文件或多個文件夾,使用Multipleoutputs private MultipleOutputs<Text, IntWritable> mout ; @Override protected void setup(Context context) throws IOException, InterruptedException { mout = new MultipleOutputs<Text, IntWritable>(context) ; } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int begin = key.toString().indexOf("@") ; int end = key.toString().indexOf(".") ; if(begin >= end){ return ; } //獲取郵箱類別,好比qq,163等 String name = key.toString().substring(begin + 1, end); int sum = 0 ; for (IntWritable value : values) { sum += value.get() ; } result.set(sum); //baseoutputpath-r-nnnnn mout.write(key, result, name); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mout.close(); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration() ; //刪除已經存在的輸出目錄 Path mypath = new Path(args[1]) ; FileSystem hdfs = mypath.getFileSystem(conf); if(hdfs.isDirectory(mypath)){ hdfs.delete(mypath, true) ; } Job job = Job.getInstance(conf, "emailcount") ; job.setJarByClass(MapReduceCaseEmail.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(EmailMapper.class); job.setReducerClass(EmailReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true) ; return 0; } public static void main(String[] args) throws Exception { String[] args0 = {"hdfs://192.168.153.111:9000/input6", "hdfs://192.168.153.111:9000/output16"} ; int res = ToolRunner.run(new MapReduceCaseEmail(), args0) ; System.exit(res); } }