MapReduce實現wordcount統計

繼承Mapper的泛型java

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>數組

LongWritable->起始偏移量緩存

Text->輸入的文本網絡

Text->輸出的文本app

LongWritable->計數框架

4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的類型ide

map 和 reduce 的數據輸入輸出都是以 key-value對的形式封裝的oop

默認狀況下,框架傳遞給咱們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,這一行的內容做爲valuespa

序列化問題: code

key-value數據要在網絡中傳輸,必須實現序列化,java自帶序列化功能,可是數據比較冗餘,對於MapReduce海量數據分析過程當中會有不利,所以實現hadoop本身的序列化。

 繼承Mapper,重寫map方法

mapreduce框架每讀一行數據就調用一次該方法

具體業務邏輯就寫在這個方法體中,並且咱們業務要處理的數據已經被框架傳遞進來,在方法的參數中 key-value

 

@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {

}

 

 實現業務邏輯代碼:

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
	
	//mapreduce框架每讀一行數據就調用一次該方法
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		//具體業務邏輯就寫在這個方法體中,並且咱們業務要處理的數據已經被框架傳遞進來,在方法的參數中 key-value
		//key 是這一行數據的起始偏移量     value 是這一行的文本內容
		
		//將這一行的內容轉換成string類型
		String line = value.toString();
		
		//對這一行的文本按特定分隔符切分
		String[] words = StringUtils.split(line, " ");
		
		//遍歷這個單詞數組輸出爲kv形式  k:單詞   v : 1
		for(String word : words){
			
			context.write(new Text(word), new LongWritable(1));
			
		}
		

	}

 

繼承Reducer實現reduce方法

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
	
	
	
	//框架在map處理完成以後,將全部kv對緩存起來,進行分組,而後傳遞一個組<key,valus{}>,調用一次reduce方法
	//<hello,{1,1,1,1,1,1.....}>
	@Override
	protected void reduce(Text key, Iterable<LongWritable> values,Context context)
			throws IOException, InterruptedException {

		long count = 0;
		//遍歷value的list,進行累加求和
		for(LongWritable value:values){
			
			count += value.get();
		}
		
		//輸出這一個單詞的統計結果
		
		context.write(key, new LongWritable(count));
		
	}
	
	

}

 

 

map、reduce代碼分別完成後,還須要有一個類,用來描述整個邏輯:

map分佈在哪,reduce分佈在哪;用哪一個map,哪一個reduce?還須要打成jar包。

把一個業務邏輯處理的整個過程叫作一個job,告訴集羣用哪一個job,哪一個工程,哪一個map,reduce,處理數據的路徑,輸出的結果等。

/**
 * 用來描述一個特定的做業
 * 好比,該做業使用哪一個類做爲邏輯處理中的map,哪一個做爲reduce
 * 還能夠指定該做業要處理的數據所在的路徑
 * 還能夠指定改做業輸出的結果放到哪一個路徑
 * ....
 * @author duanhaitao@itcast.cn
 *
 */
public class WCRunner {

	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		
		Job wcjob = Job.getInstance(conf);
		
		//設置整個job所用的那些類在哪一個jar包
		wcjob.setJarByClass(WCRunner.class);
		
		
		//本job使用的mapper和reducer的類
		wcjob.setMapperClass(WCMapper.class);
		wcjob.setReducerClass(WCReducer.class);
		
		
		//指定reduce的輸出數據kv類型
		wcjob.setOutputKeyClass(Text.class);
		wcjob.setOutputValueClass(LongWritable.class);
		
		//指定mapper的輸出數據kv類型
		wcjob.setMapOutputKeyClass(Text.class);
		wcjob.setMapOutputValueClass(LongWritable.class);
		
		
		//指定要處理的輸入數據存放路徑
		FileInputFormat.setInputPaths(wcjob, new Path("hdfs://weekend110:9000/wc/srcdata/"));
		
		//指定處理結果的輸出數據存放路徑
		FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://weekend110:9000/wc/output3/"));
		
		//將job提交給集羣運行 
		wcjob.waitForCompletion(true);
			
	}
			
}

 

把工程打成jar包,上傳d到hadoop集羣中

啓動hadoop yarn

hadoop jar 指定類 

相關文章
相關標籤/搜索