Hadoop的「Hello world」---WordCount

在安裝並配置好Hadoop環境以後,須要運行一個實例來驗證配置是否正確,Hadoop就提供了一個簡單的wordcount程序,其實就是統計單詞個數的程序,這個程序能夠算是Hadoop中的「Hello World」了。java

MapReduce

原理

MapReduce其實就是採用分而治之的思想,將大規模的數據分紅各個節點共同完成,而後再整合各個節點的結果,獲得最終的結果。這些分節點處理數據均可以作到並行處理,大大縮減了工做的複雜度。apache

過程

MapReduce能夠分紅兩個階段,其實就是單詞拆成map和reduce,這實際上是兩個函數。map函數會產生一箇中間輸出,而後reduce函數接受多個map函數產生的一系列中間輸出而後再產生一個最終輸出。app

WordCount展現

前期工做

啓動hadoop

cd /usr/hadoop/hadoop-2.6.2/
sbin/start-dfs.sh
sbin/start-yarn.sh

建立本地數據文件

cd ~/
mkdir ~/file
cd file
echo "Hello World" > test1.txt
echo "Hello Hadoop" > test2.txt

這樣就建立了兩個txt文件,裏面分別有一個字符串:Hello World,Hello Hadoop。咱們經過wordcount想要獲得的結果是這樣的:Hello 2,World 1,Hadoop 1。函數

在HDFS上建立輸入文件夾

hadoop fs -mkdir /input

建立好咱們能夠經過oop

hadoop fs -ls /

來查看結果:源碼分析

將數據文件傳到input目錄下

hadoop fs -put ~/file/test*.txt /input

一樣,咱們能夠經過spa

hadoop fs -ls /input

來查看是否上傳成功:3d

若是看不到任何結果,說明在hadoop的配置中存在問題,或者是防火牆沒有關閉,致使節點鏈接不通。code

運行程序

運行wordcount

hadoop jar /你的hadoop根目錄/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.2.jar wordcount /input /output

運行這條命令後,Hadoop會啓動一個JVM來運行MapReduce程序,並且會在集羣上建立一個output文件夾,將結果存在其中。orm

咱們來看看結果:

注意點:

  • 這個目錄必定要填對,要否則會報jar不存在。

  • 輸出文件夾必定要是空文件夾。

查看結果

output文件夾中如今有兩個文件,咱們須要的結果在part-r-00000這個文件夾中。

hadoop fs -cat /output/part-r-00000

咱們就能夠看到最終的wordcount結果了:

WordCount源碼分析

Map過程

源碼:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    IntWritable one = new IntWritable(1);
    Text word = new Text();
    public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while(itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

繼承Mapper類,重寫map方法。

咱們瞭解到mapreduce中數據都是經過<key,value>傳遞的。咱們能夠經過控制檯來看看其中的value值和key值是什麼樣的。在map方法中加入如下代碼:

System.out.println("key= "+key.toString());//查看key值
System.out.println("value= "+value.toString());//查看value值

運行程序後控制檯輸出以下:

咱們能夠看出,map方法中的value值存儲的是文本文件中的一行,而key值爲該行的首字符相對於文本文件的首地址的偏移量。

程序中的StringTokenizer這個類的功能是將每一行拆分紅一個一個的單詞,並將<word,1>做爲map方法的結果輸出。

Reduce過程

源碼:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable result = new IntWritable();
    public void reduce(Text    key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
        int sum = 0;
        for(IntWritable val:values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key,result);
    }
}

一樣,Reduce過程也須要繼承一個Reducer類,並重寫reduce方法。

咱們能夠看到reduce的輸入參數是Text keyIterable<Intrable>。咱們知道reduce方法的輸入參數key是一個單詞,而values是由各個Mapper上對應單詞的計數值所組成的列表,咱們能夠看到values實現了一個Iterable接口,能夠理解成values裏面包含了多個IntWritable整數,其實也就是計數值。

而後咱們只要遍歷values而且求和,就能夠獲得各單詞的總次數了。

執行MapReduce

咱們已經寫好了map函數和reduce函數,如今就是要執行mapreduce了。

源碼:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "wordcount");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    } 
}

代碼中的job.set*()方法是爲對任務的參數進行相關的設置,而後調用job.waitForCompletion()方法執行任務。

原文連接:http://axuebin.com/blog/2016/02/14/hadoop-wordcount/

相關文章
相關標籤/搜索