一、Map任務處理html
1.1 讀取HDFS中的文件。每一行解析成一個<k,v>。每個鍵值對調用一次map函數。 <0,hello you> <10,hello me> java
1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換爲新的<k,v>輸出。 <hello,1> <you,1> <hello,1> <me,1>apache
1.3 對1.2輸出的<k,v>進行分區。默認分爲一個區。Partitioner數組
1.4 溢寫Split網絡
1.5 對不一樣分區中的數據進行排序(按照k)Sort。app
1.6 (可選)對分組後的數據進行歸約。Combiner函數
combiner是一個可選的本地reducer,能夠在map階段聚合數據。combiner經過執行單個map範圍內的聚合,減小經過網絡傳輸的數據量。oop
例如,一個聚合的計數是每一個部分計數的總和,用戶能夠先將每一箇中間結果取和,再將中間結果的和相加,從而獲得最終結果。post
求平均值的時候不能用,由於123的平均是2,12平均再和3平均結果就不對了。Combiner應該用於那種Reduce的輸入key/value與輸出key/value類型徹底一致,且不影響最終結果的場景,好比累加,最大值等。url
1.7 合併Merge
二、Reduce任務處理
2.1 拉取數據Fetch
2.2 合併Merge
2.3 Reduce
三、WordCount代碼
package mapreduce; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCountApp { static final String INPUT_PATH = "hdfs://chaoren:9000/hello"; static final String OUT_PATH = "hdfs://chaoren:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUT_PATH); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath, true); } Job job = new Job(conf, WordCountApp.class.getSimpleName()); // 指定讀取的文件位於哪裏 FileInputFormat.setInputPaths(job, INPUT_PATH); // 指定如何對輸入的文件進行格式化,把輸入文件每一行解析成鍵值對 //job.setInputFormatClass(TextInputFormat.class); // 指定自定義的map類 job.setMapperClass(MyMapper.class); // map輸出的<k,v>類型。若是<k3,v3>的類型與<k2,v2>類型一致,則能夠省略 //job.setOutputKeyClass(Text.class); //job.setOutputValueClass(LongWritable.class); // 指定自定義reduce類 job.setReducerClass(MyReducer.class); // 指定reduce的輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 指定寫出到哪裏 FileOutputFormat.setOutputPath(job, outPath); // 指定輸出文件的格式化類 //job.setOutputFormatClass(TextOutputFormat.class); // 分區 //job.setPartitionerClass(clz); // 排序、分組、歸約 //job.setSortComparatorClass(clz); //job.setGroupingComparatorClass(clz); //job.setCombinerClass(clz); // 有一個reduce任務運行 //job.setNumReduceTasks(1); // 把job提交給jobtracker運行 job.waitForCompletion(true); } /** * * KEYIN 即K1 表示行的偏移量 * VALUEIN 即V1 表示行文本內容 * KEYOUT 即K2 表示行中出現的單詞 * VALUEOUT 即V2 表示行中出現的單詞的次數,固定值1 * */ static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException, InterruptedException { String[] splited = v1.toString().split("\t"); for (String word : splited) { context.write(new Text(word), new LongWritable(1)); } }; } /** * KEYIN 即K2 表示行中出現的單詞 * VALUEIN 即V2 表示出現的單詞的次數 * KEYOUT 即K3 表示行中出現的不一樣單詞 * VALUEOUT 即V3 表示行中出現的不一樣單詞的總次數 */ static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException, InterruptedException { long times = 0L; for (LongWritable count : v2s) { times += count.get(); } ctx.write(k2, new LongWritable(times)); }; } }