import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;
public class QuChong { /** * 數據去重 利用並化的的思想 * @author hadoop * */ public static class Engine extends Mapper<Object, Text, Text, Text>{ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); context.write(new Text(line), new Text("")); } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,Text> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } }
public static void main(String[] args) throws Exception { //設置引擎配置類,包括引擎地址,引擎輸入輸出參數(目錄) Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(QuChong.class); //設置Map、Combine和Reduce處理類 job.setMapperClass(Engine.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); //設置輸出類 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //設置輸入類及輸入目錄 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }