MR操做————Map、Partitioner、Shuffle、Combiners、Reduce 1.Map步驟 1.1 讀取輸入文件,解析成k-v對,其中每一個k-v對調用一次map函數 1.2 寫本身的邏輯,對輸入的k-v進行處理,轉換成新的k-v 1.3 對輸出的k-v進行分區(Partitioner) 1.4 對不一樣分區的數據進行排序/分組,將相同的key的value放在一個集合中(Shuffle處理) 1.5 分組後進行歸約(可選)(Combiners 可理解爲單個節點的reduce 不是全部算法都適合 如求平均數時就不適合,有可能形成結果不對) 2.Reduce步驟 2.1 對多個map輸出數據按照不一樣的分區分配到不一樣的reduce節點 2.2 對多個map任務輸出進行合併、排序。寫本身的邏輯,對輸入的k-v對進行處理,轉換成新的k-v,每一個k-v對調用一次reduce函數 2.3 把reduce結果保存到文件中輸出 3.流程圖: 文件輸入 ->Map提取相應數據(按照必定格式) ->Partitioner(按照key進行分區) ->Shuffle(按照分區進行排序) ->Combiners(按照分區進行歸約) ->推送到各個Redeuce節點 ->Reduce(根據輸入數據按照邏輯進行運行) 文件輸出 4.JavaAPI 步驟: 4.1 定義一個類繼承Mapper,其中泛型四個參數 分別是輸入key,輸入value,輸出key,輸出value(注意類型需使用Hadoop中的數據類型,參見java類型對應Hadoop類型) 4.2 重寫map()方法 4.3 定義一個類繼承Reducer,其中泛型四個參數 分別是輸入key,輸入value,輸出key,輸出value(注意此時輸入參數應該是Mapper的輸出參數) 4.4 重寫reduce()方法 4.5 編寫啓動任務 1.獲得Configuration對象 2.獲得Job對象 3.根據MR步驟設置Job參數 第一步:輸入文件 輸入目錄 輸入數據類型處理 第二步:自定義Mapper類 輸入自定義Mapper類 輸出數據類型(K-V)處理 第三步:(可不選) 分區 第四步:(可不選) 排序分組 第五步:(可不選) 歸約 第六步:(可不選) 分配到不一樣Reduce節點 第七步:自定義Reduce類 輸出自定義Reduce類 輸出數據類型(K-V)處理 第八步:輸出文件 輸出路徑 輸出格式類 4.6 打包發佈運行 5.Hadoop內置數據類型 5.1 基礎數據類型 1.BooleanWritable:標準布爾型數值 2.ByteWritable:單字節數值 3.DoubleWritable:雙字節數值 4.FloatWritable:浮點數 5.IntWritable:整型數 6.LongWritable:長整型數 7.Text:使用UTF8格式存儲的文本 8.NullWritable:當<key, value>中的key或value爲空時使用 5.2 自定義數據類型 1.繼承接口Writable,實現其方法write()和readFields(), 以便該數據能被序列化後完成網絡傳輸或文件輸入/輸出 2.若是該數據須要做爲主鍵key使用,或須要比較數值大小時,則須要實現WritalbeComparable接口,實現其方法write(),readFields(),CompareTo() 6.MR中經常使用算法 1.單詞計數 2.數據去重 3.排序 4.選擇 5.分組 6.單表關聯 7.多表鏈接 JavaAPI 示例代碼: package testHadoopMR; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class testHadoopMR { static final String INPUT_PATH = "hdfs://hadoop:9000/input/file.ini"; //輸入文件 static final String OUTPUT_PATH = "hdfs://hadoop:9000/out";//輸出路徑 public static void main(String[] args) throws Exception { Configuration conf = new Configuration();//獲得configuration對象 Job job = new Job(conf, "word count");//獲得job對象 job.setJarByClass(testHadoopMR.class);//設置任務加載類 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));//1.設置輸入文件路徑 job.setMapperClass(MyMapper.class);//2.自定義mapper job.setMapOutputKeyClass(Text.class);//輸出key類型 job.setMapOutputValueClass(IntWritable.class);//輸出value類型 job.setPartitionerClass(HashPartitioner.class);//3.設置分區 job.setNumReduceTasks(1);//分區數爲1 job.setGroupingComparatorClass(MyGroupingComparator.class);//4.設置排序和分組 需自定義排序分組類,否則會按照hash值排序 job.setCombinerClass(MyReducer.class);// 5.設置Map規約Combiner job.setReducerClass(MyReduce.class);//7.自定義reduce job.setOutputKeyClass(Text.class);//設置輸出 key類型 job.setOutputValueClass(IntWritable.class);//設置輸出value類型 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));//8.設置輸出路徑 job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }