Word Count

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
//Map階段:輸入的行號做爲key,每行讀取的值做爲value
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 
	private Text k  = new Text();
	private IntWritable v = new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value,Context context) throws java.io.IOException, java.lang.InterruptedException {
	     
		// 1 將每次讀入的一行進行分割
		String line = value.toString();
		
		// 2 轉換成String類型進行分割
		String[] words = line.split(" ");
		
		// 3 將每一個鍵值對都寫出
		for (String word : words) {
			String trim = word.trim();
			if(!" ".equals(trim)){
				k.set(trim);
				// 4 map階段將單詞拆分,並不合併,因此固定值爲1
				context.write(k, v);
			}
		}
	}
	
}
複製代碼
import java.util.Iterator;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
//Reduce階段是以Map階段的輸出結果做爲Reduce階段的輸入數據
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
 
	
	//同一個key有且僅只執行一次reduce方法
	@Override
	protected void reduce(Text text, Iterable<IntWritable> iterable, Context context) throws java.io.IOException, java.lang.InterruptedException {
	    
		// 1. 將map階段同一個key對應的value值求和
		int sum = 0;
		Iterator<IntWritable> iterator = iterable.iterator();
		while(iterator.hasNext()){
			 sum += iterator.next().get();
		}
		if(!text.toString().trim().equals("")){
			//將結果輸出
			context.write(text, new IntWritable(sum));
		}
	}
	
}
複製代碼

相關文章
相關標籤/搜索