WordMap.javajava
package MyMap; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordMap extends Mapper<LongWritable,Text,Text,IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("bbbbbbbbbbbbbbbbbbbbbbbb====>"+value.toString()); String line = value.toString(); String[] words = line.split(" "); for(String word : words) { context.write(new Text(word), new IntWritable(1)); } } }
WordReduce.javaapache
package MyReduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordReduce extends Reducer<Text, IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { Integer count = 0; for(IntWritable value : values) { count += value.get(); } context.write(key,new IntWritable(count)); } }
WordMain.javaapp
package MyMain; import MyMap.WordMap; import MyReduce.WordReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //建立配置對象 Configuration configuration = new Configuration(); //建立job對象 Job job = Job.getInstance(configuration,"mineWordCount"); System.out.println("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); //設置運行job類 job.setJarByClass(WordMain.class); //設置mapper類 job.setMapperClass(WordMap.class); //設置reduce類 job.setReducerClass(WordReduce.class); //設置map輸出的key value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設置reduce輸出的key value job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交job boolean b = job.waitForCompletion(true); if(b) { System.out.println("----------->success"); } else { System.out.println("----------->error"); } } }