輸入經過InputFormat讀取,每讀一行交由map處理,通過Shuffling分序丟到Reducing上面處理,最後經過OutputFormat把記錄輸出到文件系統(HDFS)上面去。java
java源碼:apache
package com.cracker.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 使用MapReduce開發WordCount應用程序 */ public class WordCountApp { /** * Map:讀取輸入的文件 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { LongWritable one = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收到的每一行數據 String line = value.toString(); //按照指定分隔符進行拆分 String[] words = line.split(" "); for (String word : words) { // 經過上下文把map的處理結果輸出 context.write(new Text(word), one); } } } /** * Reduce:歸併操做 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { // 求key出現的次數總和 sum += value.get(); } // 最終統計結果的輸出 context.write(key, new LongWritable(sum)); } } /** * 定義Driver:封裝了MapReduce做業的全部信息 */ public static void main(String[] args) throws Exception { //建立Configuration Configuration configuration = new Configuration(); //建立Job Job job = Job.getInstance(configuration, "wordcount"); //設置job的處理類 job.setJarByClass(WordCountApp.class); //設置做業處理的輸入路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); //設置map相關參數 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //設置reduce相關參數 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //設置做業處理的輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相關命令服務器
本地編譯app
mvn clean package -DskipTestside
服務器oop
hadoop jar /root/app/hadoop-train-1.0.jar com.cracker.hadoop.mapreduce.WordCountApp hdfs://localhost:8020/hello.txt hdfs://localhost:8020/output/wcspa