在安裝並配置好Hadoop環境以後,須要運行一個實例來驗證配置是否正確,Hadoop就提供了一個簡單的wordcount程序,其實就是統計單詞個數的程序,這個程序能夠算是Hadoop中的「Hello World」了。java
MapReduce其實就是採用分而治之的思想,將大規模的數據分紅各個節點共同完成,而後再整合各個節點的結果,獲得最終的結果。這些分節點處理數據均可以作到並行處理,大大縮減了工做的複雜度。apache
MapReduce能夠分紅兩個階段,其實就是單詞拆成map和reduce,這實際上是兩個函數。map函數會產生一箇中間輸出,而後reduce函數接受多個map函數產生的一系列中間輸出而後再產生一個最終輸出。app
cd /usr/hadoop/hadoop-2.6.2/ sbin/start-dfs.sh sbin/start-yarn.sh
cd ~/ mkdir ~/file cd file echo "Hello World" > test1.txt echo "Hello Hadoop" > test2.txt
這樣就建立了兩個txt文件,裏面分別有一個字符串:Hello World,Hello Hadoop。咱們經過wordcount想要獲得的結果是這樣的:Hello 2,World 1,Hadoop 1。函數
hadoop fs -mkdir /input
建立好咱們能夠經過oop
hadoop fs -ls /
來查看結果:源碼分析
hadoop fs -put ~/file/test*.txt /input
一樣,咱們能夠經過spa
hadoop fs -ls /input
來查看是否上傳成功:3d
若是看不到任何結果,說明在hadoop的配置中存在問題,或者是防火牆沒有關閉,致使節點鏈接不通。code
hadoop jar /你的hadoop根目錄/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.2.jar wordcount /input /output
運行這條命令後,Hadoop會啓動一個JVM來運行MapReduce程序,並且會在集羣上建立一個output文件夾,將結果存在其中。orm
咱們來看看結果:
注意點:
這個目錄必定要填對,要否則會報jar不存在。
輸出文件夾必定要是空文件夾。
output文件夾中如今有兩個文件,咱們須要的結果在part-r-00000
這個文件夾中。
hadoop fs -cat /output/part-r-00000
咱們就能夠看到最終的wordcount結果了:
源碼:
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { IntWritable one = new IntWritable(1); Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
繼承Mapper類,重寫map方法。
咱們瞭解到mapreduce中數據都是經過<key,value>傳遞的。咱們能夠經過控制檯來看看其中的value值和key值是什麼樣的。在map方法中加入如下代碼:
System.out.println("key= "+key.toString());//查看key值 System.out.println("value= "+value.toString());//查看value值
運行程序後控制檯輸出以下:
咱們能夠看出,map方法中的value值存儲的是文本文件中的一行,而key值爲該行的首字符相對於文本文件的首地址的偏移量。
程序中的StringTokenizer
這個類的功能是將每一行拆分紅一個一個的單詞,並將<word,1>做爲map方法的結果輸出。
源碼:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException { int sum = 0; for(IntWritable val:values) { sum += val.get(); } result.set(sum); context.write(key,result); } }
一樣,Reduce過程也須要繼承一個Reducer類,並重寫reduce方法。
咱們能夠看到reduce的輸入參數是Text key
和Iterable<Intrable>
。咱們知道reduce方法的輸入參數key是一個單詞,而values是由各個Mapper上對應單詞的計數值所組成的列表,咱們能夠看到values實現了一個Iterable接口,能夠理解成values裏面包含了多個IntWritable整數,其實也就是計數值。
而後咱們只要遍歷values而且求和,就能夠獲得各單詞的總次數了。
咱們已經寫好了map函數和reduce函數,如今就是要執行mapreduce了。
源碼:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } }
代碼中的job.set*()
方法是爲對任務的參數進行相關的設置,而後調用job.waitForCompletion()
方法執行任務。