WordCount是寫hadoop mapreduce入門級程序,會寫wordcount的話,基本上80%的mapreduce就懂了。java
mapreduce分爲map過程和reduce過程,用戶能夠根據本身的業務自定義map過程和reduce過程。apache
以wordcount爲例,要計算文本中單詞出現的個數,須要讀取文本,並針對單詞進行統計。服務器
package com.hadoop.mapreduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * * Created by Frankie on 2018/1/14. * KEYIN: 默認狀況下,是mr框架所讀到的一行文本的起始偏移量, Long 在hadoop中有本身的精簡序列化接口,因此不直接使用long, 而用LongWritable * VALUEIN: 默認狀況下,是mr框架所讀到的一行文本的內容, String * KEYOUT: 是用戶自定義邏輯處理完成後輸出數據中的key, 在此處是單詞,String * VALUEOUT: 是用戶自定義邏輯處理完成後輸出數據中的vlaue, 在次數是單詞次數,Integer * * **/ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /* * map 階段的業務邏輯就寫在自定義的Map()方法中 * map task會對每一行輸入數據調用一次咱們自定義map()方法 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] words = line.split(" "); for(String word: words){ // 將單詞做爲key, 將次數1做爲value,以便於後續的數據分發,能夠根據單詞分發,以便於相同單詞會用到相同的reduce task // map task會收集,寫在一個文件上 context.write(new Text(word), new IntWritable(1)); } } }
package com.hadoop.mapreduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * Created by Frankie on 2018/1/14. * * KEYIN, VALUEIN 對應 mapper輸出的KEYOUT, VALUEOUT類型對應 * KEYOUT, VALUEOUT是自定義reduce邏輯處理結果的輸出數據類型 * KEYOUT是單詞, * VALUE是總次數 */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ /* 入參key, 是一組相同單詞kv對的key Context 上下文 * */ int count = 0; // Iterator<IntWritable> iterator = values.iterator(); // while(iterator.hasNext()){ // count += iterator.next().get(); // } // for( IntWritable value: values){ count += value.get(); } context.write(key, new IntWritable(count)); } }
mapreduce過程存在一些問題,好比,app
Map task如何進行任務分配?框架
Reduce task如何進行任務分配?ide
Map task與 reduce task如何進行銜接?oop
若是某map task 運行失敗,如何處理?code
Map task若是都要本身負責數據的分區,很麻煩orm
爲例解決這些問題,須要有個master專門對map reduce進行管理。接口
在WordCount文件中,有專門對做業進行配置,以及最後將代碼提交到客戶端。
package com.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Created by Frankie on 2018/1/14. * * 至關於yarn集羣的客戶端 * 須要在此封裝mr程序的相關運行參數,指定jar包,最後提交給yarn */ public class WordCount { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "wordcount"); // 指定本程序的jar包所在的本地路徑 job.setJarByClass(WordCount.class); // 指定本業務使用的map業務類 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定job的輸入原始文件所在目錄 // /data/adult.data FileInputFormat.setInputPaths(job, new Path(args[1])); // 指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path(args[2])); // // 將job中配置的相關參數,以及job所用的java類所在的Jar包,提交給yarn去運行 // job.submit(); // 提交job配置,一直等待到運行結束 boolean res = job.waitForCompletion(true); System.exit(res? 0: 1); } }
代碼編輯完成後,對代碼進行打包。咱們在這裏選擇不依賴第三方包的打包方式進行打包。
打完包後,將生成的jar包提交到服務器中去。 並執行,
leiline@master:~/Documents/hadoop/myJars$ hadoop jar HadoopMapReduce.jar com.hadoop.mapreduce.WordCount /data/adult /data/out
注意,out文件是由程序自動建立的,不須要用戶手動去建立。最後,代碼執行完畢後,能夠在hdfs中看到執行的結果:
Found 2 items -rw-r--r-- 3 leiline supergroup 0 2018-01-14 19:01 /data/out/_SUCCESS -rw-r--r-- 3 leiline supergroup 216737 2018-01-14 19:01 /data/out/part-r-00000