/* * KEYIN 表示咱們當前讀取一個文件[qqq.txt] 讀到多少個字節了 數量詞 * VALUEIN 表示咱們當前讀的是文件的多少行 逐行讀取 表示咱們讀取的一行文字 * KEYOUT 咱們執行MAPPER以後 寫入到文件中KEY的類型 * VALUEOUT 咱們執行MAPPER以後 寫入到文件中VALUE的類型 * */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { //去掉 , 。 ? char[] charArray = ivalue.toString().replace(",", "").replace("。", "").replace("?", "").toCharArray(); for (char c : charArray) { //寫入到一個臨時文件中 context.write(new Text(String.valueOf(c)), new IntWritable(1));//寫入到臨時文件當中 } } }
/** * KEYIN Text * VALUEIN IntWritbale * KEYOUT Text 咱們Reduce以後 這個文件中內容的 Key是什麼 * VALUEOUT IntWritable 這個文件中內容Value是什麼 */ public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum=0; for(IntWritable value:values) { sum+=value.get(); } context.write(key,new IntWritable(sum)); } }
/** * Driver這個類 用來執行一個任務 Job * 任務=Mapper+Reduce+HDFS * 把他們3者 關聯起來 */ public class WordCountJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.43.61:9000"); //指定使用的hdfs文件系統 Job job = Job.getInstance(conf, "WordCount"); //任務名 job.setJarByClass(WordCountJob.class); //指定job類 // TODO: specify a mapper job.setMapperClass(WordCountMapper.class); //指定mapper類 // TODO: specify a reducer job.setReducerClass(WordCountReduce.class); //指定reduce類 job.setMapOutputKeyClass(Text.class); //指定map輸出的key數據格式 job.setMapOutputValueClass(IntWritable.class); //指定map輸出的value數據格式 // TODO: specify output types job.setOutputKeyClass(Text.class); //指定reduce輸出的key數據格式 job.setOutputValueClass(IntWritable.class); //指定reduce輸出的value數據格式 // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path("/琵琶行.txt")); //指定須要計算的文件或文件夾 FileOutputFormat.setOutputPath(job, new Path("/out1/")); //指定輸出文件保存位置,此文件夾不得存在 if (!job.waitForCompletion(true)) return; } }