繼承Mapper的泛型java
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>數組
LongWritable->起始偏移量緩存
Text->輸入的文本網絡
Text->輸出的文本app
LongWritable->計數框架
4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的類型ide
map 和 reduce 的數據輸入輸出都是以 key-value對的形式封裝的oop
默認狀況下,框架傳遞給咱們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,這一行的內容做爲valuespa
序列化問題: code
key-value數據要在網絡中傳輸,必須實現序列化,java自帶序列化功能,可是數據比較冗餘,對於MapReduce海量數據分析過程當中會有不利,所以實現hadoop本身的序列化。
繼承Mapper,重寫map方法
mapreduce框架每讀一行數據就調用一次該方法
具體業務邏輯就寫在這個方法體中,並且咱們業務要處理的數據已經被框架傳遞進來,在方法的參數中 key-value
@Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { }
實現業務邏輯代碼:
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ //mapreduce框架每讀一行數據就調用一次該方法 @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //具體業務邏輯就寫在這個方法體中,並且咱們業務要處理的數據已經被框架傳遞進來,在方法的參數中 key-value //key 是這一行數據的起始偏移量 value 是這一行的文本內容 //將這一行的內容轉換成string類型 String line = value.toString(); //對這一行的文本按特定分隔符切分 String[] words = StringUtils.split(line, " "); //遍歷這個單詞數組輸出爲kv形式 k:單詞 v : 1 for(String word : words){ context.write(new Text(word), new LongWritable(1)); } }
繼承Reducer實現reduce方法
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ //框架在map處理完成以後,將全部kv對緩存起來,進行分組,而後傳遞一個組<key,valus{}>,調用一次reduce方法 //<hello,{1,1,1,1,1,1.....}> @Override protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { long count = 0; //遍歷value的list,進行累加求和 for(LongWritable value:values){ count += value.get(); } //輸出這一個單詞的統計結果 context.write(key, new LongWritable(count)); } }
map、reduce代碼分別完成後,還須要有一個類,用來描述整個邏輯:
map分佈在哪,reduce分佈在哪;用哪一個map,哪一個reduce?還須要打成jar包。
把一個業務邏輯處理的整個過程叫作一個job,告訴集羣用哪一個job,哪一個工程,哪一個map,reduce,處理數據的路徑,輸出的結果等。
/** * 用來描述一個特定的做業 * 好比,該做業使用哪一個類做爲邏輯處理中的map,哪一個做爲reduce * 還能夠指定該做業要處理的數據所在的路徑 * 還能夠指定改做業輸出的結果放到哪一個路徑 * .... * @author duanhaitao@itcast.cn * */ public class WCRunner { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); //設置整個job所用的那些類在哪一個jar包 wcjob.setJarByClass(WCRunner.class); //本job使用的mapper和reducer的類 wcjob.setMapperClass(WCMapper.class); wcjob.setReducerClass(WCReducer.class); //指定reduce的輸出數據kv類型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(LongWritable.class); //指定mapper的輸出數據kv類型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(LongWritable.class); //指定要處理的輸入數據存放路徑 FileInputFormat.setInputPaths(wcjob, new Path("hdfs://weekend110:9000/wc/srcdata/")); //指定處理結果的輸出數據存放路徑 FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://weekend110:9000/wc/output3/")); //將job提交給集羣運行 wcjob.waitForCompletion(true); } }
把工程打成jar包,上傳d到hadoop集羣中
啓動hadoop yarn
hadoop jar 指定類