MR操做

MR操做————Map、Partitioner、Shuffle、Combiners、Reduce
1.Map步驟
	1.1	讀取輸入文件,解析成k-v對,其中每一個k-v對調用一次map函數
	1.2 寫本身的邏輯,對輸入的k-v進行處理,轉換成新的k-v
	1.3 對輸出的k-v進行分區(Partitioner)
	1.4 對不一樣分區的數據進行排序/分組,將相同的key的value放在一個集合中(Shuffle處理)
	1.5 分組後進行歸約(可選)(Combiners 可理解爲單個節點的reduce  不是全部算法都適合  如求平均數時就不適合,有可能形成結果不對)
2.Reduce步驟
	2.1	對多個map輸出數據按照不一樣的分區分配到不一樣的reduce節點
	2.2 對多個map任務輸出進行合併、排序。寫本身的邏輯,對輸入的k-v對進行處理,轉換成新的k-v,每一個k-v對調用一次reduce函數
	2.3 把reduce結果保存到文件中輸出
3.流程圖:
	文件輸入 
		->Map提取相應數據(按照必定格式)
		->Partitioner(按照key進行分區)
		->Shuffle(按照分區進行排序)
		->Combiners(按照分區進行歸約)
		->推送到各個Redeuce節點
		->Reduce(根據輸入數據按照邏輯進行運行)
	文件輸出
4.JavaAPI
	步驟:
		4.1 定義一個類繼承Mapper,其中泛型四個參數 分別是輸入key,輸入value,輸出key,輸出value(注意類型需使用Hadoop中的數據類型,參見java類型對應Hadoop類型)
		4.2 重寫map()方法
		4.3 定義一個類繼承Reducer,其中泛型四個參數 分別是輸入key,輸入value,輸出key,輸出value(注意此時輸入參數應該是Mapper的輸出參數)
		4.4 重寫reduce()方法
		4.5	編寫啓動任務
			1.獲得Configuration對象
			2.獲得Job對象
			3.根據MR步驟設置Job參數
				第一步:輸入文件
					輸入目錄
					輸入數據類型處理
				
				第二步:自定義Mapper類
					輸入自定義Mapper類
					輸出數據類型(K-V)處理
				
				第三步:(可不選)
					分區
				
				第四步:(可不選)
					排序分組
				
				第五步:(可不選)
					歸約
				第六步:(可不選)
					分配到不一樣Reduce節點
					
				第七步:自定義Reduce類
					輸出自定義Reduce類
					輸出數據類型(K-V)處理
					
				第八步:輸出文件
					輸出路徑
					輸出格式類
		4.6 打包發佈運行
	
5.Hadoop內置數據類型
	5.1 基礎數據類型
			1.BooleanWritable:標準布爾型數值
			2.ByteWritable:單字節數值
			3.DoubleWritable:雙字節數值
			4.FloatWritable:浮點數
			5.IntWritable:整型數
			6.LongWritable:長整型數
			7.Text:使用UTF8格式存儲的文本
			8.NullWritable:當<key, value>中的key或value爲空時使用
	5.2 自定義數據類型
			1.繼承接口Writable,實現其方法write()和readFields(), 以便該數據能被序列化後完成網絡傳輸或文件輸入/輸出
			2.若是該數據須要做爲主鍵key使用,或須要比較數值大小時,則須要實現WritalbeComparable接口,實現其方法write(),readFields(),CompareTo() 

6.MR中經常使用算法
	1.單詞計數
	2.數據去重
	3.排序
	4.選擇
	5.分組
	6.單表關聯
	7.多表鏈接
			
JavaAPI 示例代碼:		
	package testHadoopMR;

	import java.io.IOException;
	import java.util.StringTokenizer;

	import org.apache.hadoop.conf.*;
	import org.apache.hadoop.fs.*;
	import org.apache.hadoop.io.*;
	import org.apache.hadoop.mapred.*;
	import org.apache.hadoop.mapreduce.*;
	import org.apache.hadoop.mapreduce.Mapper;
	import org.apache.hadoop.mapreduce.Reducer;
	import org.apache.hadoop.mapreduce.Mapper.Context;
	import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
	import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


	public class testHadoopMR {
		static final String INPUT_PATH = "hdfs://hadoop:9000/input/file.ini";  //輸入文件
		static final String OUTPUT_PATH = "hdfs://hadoop:9000/out";//輸出路徑

		public static void main(String[] args) throws Exception {
			Configuration conf = new Configuration();//獲得configuration對象
			Job job = new Job(conf, "word count");//獲得job對象
			job.setJarByClass(testHadoopMR.class);//設置任務加載類

			FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));//1.設置輸入文件路徑

			job.setMapperClass(MyMapper.class);//2.自定義mapper
			job.setMapOutputKeyClass(Text.class);//輸出key類型
			job.setMapOutputValueClass(IntWritable.class);//輸出value類型
			
			job.setPartitionerClass(HashPartitioner.class);//3.設置分區
			job.setNumReduceTasks(1);//分區數爲1
			
			job.setGroupingComparatorClass(MyGroupingComparator.class);//4.設置排序和分組 需自定義排序分組類,否則會按照hash值排序
			
			
			job.setCombinerClass(MyReducer.class);// 5.設置Map規約Combiner
			
			job.setReducerClass(MyReduce.class);//7.自定義reduce
			job.setOutputKeyClass(Text.class);//設置輸出 key類型
			job.setOutputValueClass(IntWritable.class);//設置輸出value類型

			FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));//8.設置輸出路徑
			job.waitForCompletion(true);
		}

		public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

			private final static IntWritable one = new IntWritable(1);
			private Text word = new Text();

			public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
				StringTokenizer itr = new StringTokenizer(value.toString());
				while (itr.hasMoreTokens()) {
					word.set(itr.nextToken());
					context.write(word, one);
				}
			}
		}

		public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
			private IntWritable result = new IntWritable();

			public void reduce(Text key, Iterable<IntWritable> values, Context context)
					throws IOException, InterruptedException {
				int sum = 0;
				for (IntWritable val : values) {
					sum += val.get();
				}
				result.set(sum);
				context.write(key, result);
			}
		}

	}
相關文章
相關標籤/搜索