package com.vip; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceCaseAvg extends Configured implements Tool{ public static class AvgMapper extends Mapper<Object, Text, Text, IntWritable>{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //獲取文件內容 String content = value.toString() ; //字符串切分 StringTokenizer st = new StringTokenizer(content) ; while(st.hasMoreElements()){ String strName = st.nextToken() ; //學員姓名 String strSorce = st.nextToken() ; //學員成績 //輸出key,value context.write(new Text(strName), new IntWritable(Integer.parseInt(strSorce))); } } } public static class AvgReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ //<張三 ,{98,89,79}> @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //平均值,即便將全部的成績相加除以科目數 int sum = 0 ; //總成績 int num = 0 ; //總科目 for (IntWritable score : values) { sum += score.get() ; //累加每門課得成績 num ++ ; } context.write(key, new IntWritable((int)sum/num)); } } @Override public int run(String[] args) throws Exception { //任務和參數 Job job = Job.getInstance(getConf(), "avg mr") ; job.setJarByClass(MapReduceCaseAvg.class); /*設置map方法的類*/ job.setMapperClass(AvgMapper.class); job.setReducerClass(AvgReducer.class); /*設置輸出的key和value的類型*/ job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); /*設置輸入輸出參數*/ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /*提交做業到集羣並等待任務完成*/ boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1 ; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new MapReduceCaseAvg(), args) ; System.exit(res); } }
package com.vip; import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceCaseFilte extends Configured implements Tool { public static class FilterMapper extends Mapper<Object, Text, NullWritable, Text>{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //以空格切分字段 String[] strSrc = value.toString().split(" "); //拼接字符串 String strDst = strSrc[0] + " " + strSrc[1] + " " + strSrc[2] + " " + strSrc[6] ; context.write(NullWritable.get(), new Text(strDst)); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "mrfilter") ; job.setJarByClass(MapReduceCaseFilte.class); /*設置map方法的類*/ job.setMapperClass(FilterMapper.class); /*設置輸出的key和value的類型*/ job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); /*設置輸入輸出參數*/ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /*提交做業到集羣並等待任務完成*/ boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1 ; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new MapReduceCaseFilte(), args) ; System.exit(res); } } // cat act // tar art //<act,{cat,tac,cta}>
package com.vip; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceCaseWords extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration() ; //刪除已經存在的輸出目錄 Path mypath = new Path(args[1]) ; FileSystem hdfs = mypath.getFileSystem(conf); if(hdfs.isDirectory(mypath)){ hdfs.delete(mypath, true) ; } //設置任務信息 Job job = Job.getInstance(conf, "words mr") ; job.setJarByClass(MapReduceCaseWords.class); /*設置map方法的類*/ job.setMapperClass(WordsMapper.class); job.setReducerClass(WordsReducer.class); /*設置輸出的key和value的類型*/ job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); /*設置輸入輸出參數*/ FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); /*提交做業到集羣並等待任務完成*/ boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1 ; } public static void main(String[] args) throws Exception { String[] args0 = {"hdfs://192.168.153.111:9000/input5", "hdfs://192.168.153.111:9000/output12"} ; int res = ToolRunner.run(new MapReduceCaseWords(), args0) ; System.exit(res); } }
package com.vip; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordsMapper extends Mapper<Object, Text, Text, Text>{ private Text keyText = new Text() ; private Text valueText = new Text() ; @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String word = value.toString() ; char[] wordChars = word.toCharArray(); //單詞轉化爲字符數組 Arrays.sort(wordChars); //對字符數組進行排序 String sword = new String(wordChars) ; //字符數組在轉化爲字符串 keyText.set(sword); //設置輸出key valueText.set(word); //設置輸出得value得值 context.write(keyText, valueText); //map輸出 } }
package com.vip; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordsReducer extends Reducer<Text, Text, Text, Text>{ private Text outputKey = new Text() ; //輸出key private Text outputValue = new Text() ; //輸出的value @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String output ="" ; //對相同字母組成的單詞,使用~符號進行拼接 for (Text word : values) { if(!output.equals("")){ output = output + "~" ; } output = output + word.toString() ; } //輸出有兩個單詞或以上的結果 StringTokenizer outputTokenize = new StringTokenizer(output, "~") ; if(outputTokenize.countTokens() >= 2){ output = output.replaceAll("~", ",") ; outputKey.set(key.toString()); //設置key的值 outputValue.set(output); //設置value的值 context.write(outputKey, outputValue); //輸出 } } }