MapReduce實現WordCount, 及其優化

WordCount: 單詞計數, 統計文本文件中每個單詞出現的次數java


定義Mapper類, 該類繼承org.apache.hadoop.mapreduce.Mapperapache

並重寫map()方法app

public static class TokenizerMapper extends
			Mapper<LongWritable, Text, Text, IntWritable> {
	        // 定義一個靜態成員變量, 而且是不可變的, 避免每一次調用map()方法時, 建立重複對象
		private final static IntWritable one = new IntWritable(1);
		// 定義一個成員變量, 可變, 每一次調用map()方法時, 只須要調用Text.set()方法賦新值
		private Text word = new Text();

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] words = value.toString().split(" ");
			for (String item : words) {
				word.set(item);
				context.write(word, one);
			}
		}
	}

定義Reducer類, 該類繼承org.apache.hadoop.mapreduce.Reduceroop

重寫reduce()方法測試

public static class IntSumReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {
		// 定義一個成員變量, 可變, 每一次調用reduce()方法時, 只須要調用IntWritable.set()方法賦新值
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

測試WordCountspa

public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(WordCount.class); // 設置job的主類
		job.setMapperClass(TokenizerMapper.class); // 設置Mapper類
		// 利用combiner來減小經過shuffle傳輸的數據量
		job.setCombinerClass(IntSumReducer.class); // 設置Combiner類
		job.setReducerClass(IntSumReducer.class); // 設置Reducer類
		job.setMapOutputKeyClass(Text.class); // 設置map階段輸出Key的類型
		job.setMapOutputValueClass(IntWritable.class); // 設置map階段輸出Value的類型
		job.setOutputKeyClass(Text.class); // 設置reduce階段輸出Key的類型
		job.setOutputValueClass(IntWritable.class); // 設置reduce階段輸出Value的類型
		// 設置job輸入路徑(從main方法參數args中獲取)
		FileInputFormat.addInputPath(job, new Path(args[0]));
		// 設置job輸出路徑(從main方法參數args中獲取)
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true); // 提交job
	}

輸入: code

words:orm

hello tom
hello jerry
hello kitty
hello world
hello tom

輸出:對象

hello	5
jerry	1
kitty	1
tom	2
world	1

減小對象的建立, 更少的GC, 確定會帶來更快的速度繼承

利用combiner來減小經過shuffle傳輸的數據量, 這是MapReduce做業調優的關鍵點之一

相關文章
相關標籤/搜索