(一)Hadoop之Mapreduce的基礎入門實例WordCount詳解

Mapreduce初析

  Mapreduce是一個計算框架,既然是作計算的框架,那麼表現形式就是有個輸入(input)。mapreduce操做這個輸入(input),經過自己定義好的計算模型,獲得一個輸出(output),這個輸出就是咱們所須要的結果。java

  在運行一個mapreduce計算任務時候,任務過程被分爲兩個階段:map階段和reduce階段,每一個階段都是用鍵值對(key/value)做爲輸入(input)和輸出(output)。而咱們程序員要作的就是定義好這兩個階段的函數:map函數和reduce函數。程序員

img

Mapreduce的基礎實例

  講解mapreduce運行原理前,首先咱們看看mapreduce裏的hello world實例WordCount,這個實例在任何一個版本的hadoop安裝程序裏都會有。apache

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
 * 
 * 描述:WordCount explains by York
  * @author Hadoop Dev Group
 */
publicclass WordCount {
    /**
     * 創建Mapper類TokenizerMapper繼承自泛型類Mapper
     * Mapper類:實現了Map功能基類
     * Mapper接口:
     * WritableComparable接口:實現WritableComparable的類能夠相互比較。全部被用做key的類應該實現此接口。
     * Reporter 則可用於報告整個應用的運行進度,本例中未使用。 
     * 
     */
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
   /**
      * IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口,
      * 都可以被串行化從而便於在分佈式環境中進行數據交換,你能夠將它們分別視爲int,String 的替代品。
      * 聲明one常量和word用於存放單詞的變量
    */
    privatefinalstatic IntWritable one =new IntWritable(1);
    private Text word =new Text();
    /**
         * Mapper中的map方法:
         * void map(K1 key, V1 value, Context context)
         * 映射一個單個的輸入k/v對到一箇中間的k/v對
         * 輸出對不須要和輸入對是相同的類型,輸入對能夠映射到0個或多個輸出對。
         * Context:收集Mapper輸出的<k,v>對。
         * Context的write(k, v)方法:增長一個(k,v)對到context
         * 程序員主要編寫Map和Reduce函數.這個Map函數使用StringTokenizer函數對字符串進行分隔,經過write方法把單詞存入word中
         * write方法存入(單詞,1)這樣的二元組到context中
     */  
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr =new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result =new IntWritable();
    /**
      * Reducer類中的reduce方法:
      * void reduce(Text key, Iterable<IntWritable> values, Context context)
      * 中k/v來自於map函數中的context,可能通過了進一步處理(combiner),一樣經過context輸出           
    */
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum =0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  publicstaticvoid main(String[] args) throws Exception {
    /**
      * Configuration:map/reduce的j配置類,向hadoop框架描述map-reduce執行的工做
    */
    Configuration conf =new Configuration();
    String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length !=2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job =new Job(conf, "word count");    //設置一個用戶定義的job名稱
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);    //爲job設置Mapper類
    job.setCombinerClass(IntSumReducer.class);    //爲job設置Combiner類
    job.setReducerClass(IntSumReducer.class);    //爲job設置Reducer類
    job.setOutputKeyClass(Text.class);        //爲job的輸出數據設置Key類
    job.setOutputValueClass(IntWritable.class);    //爲job輸出設置value類
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    //爲job設置輸入路徑
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//爲job設置輸出路徑
    System.exit(job.waitForCompletion(true) ?0 : 1);        //運行job
  }
}

WordCount逐行解析

  • 對於map函數的方法:
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}

  這裏有三個參數,前面兩個Object key, Text value 就是輸入的 key 和 value ,第三個參數Context context這是能夠記錄輸入的key和value,例如:context.write(word, one)。app

  • 對於reduce函數的方法:
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}

  reduce函數的輸入也是一個 key/value 的形式,不過它的value是一個迭代器的形式 Iterable<IntWritable> values ,也就是說reduce的輸入是一個key對應一組的值的value,reduce也有context和map的context做用一致。至於計算的邏輯則須要程序員編碼實現。框架

  • 對於main函數的調用:

首先是分佈式

Configuration conf = new Configuration();

運行MapReduce程序前都要初始化Configuration,該類主要是讀取MapReduce系統配置信息,這些信息包括hdfs還有MapReduce,也就是安裝hadoop時候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件裏的信息,有些童鞋不理解爲啥要這麼作,這個是沒有深刻思考MapReduce計算框架形成,咱們程序員開發MapReduce時候只是在填空,在map函數和reduce函數裏編寫實際進行的業務邏輯,其它的工做都是交給MapReduce框架本身操做的,可是至少咱們要告訴它怎麼操做啊,好比hdfs在哪裏,MapReduce的jobstracker在哪裏,而這些信息就在conf包下的配置文件裏。函數

接下來的代碼是:oop

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }

  If的語句好理解,就是運行WordCount程序時候必定是兩個參數,若是不是就會報錯退出。至於第一句裏的GenericOptionsParser類,它是用來解釋經常使用hadoop命令,並根據須要爲Configuration對象設置相應的值,其實平時開發裏咱們不太經常使用它,而是讓類實現Tool接口,而後再main函數裏使用ToolRunner運行程序,而ToolRunner內部會調用GenericOptionsParser。編碼

接下來的代碼是:spa

Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

  第一行就是在構建一個job,在mapreduce框架裏一個mapreduce任務也叫mapreduce做業也叫作一個mapreduce的job,而具體的map和reduce運算就是task了,這裏咱們構建一個job,構建時候有兩個參數,一個是conf這個就不累述了,一個是這個job的名稱。

  第二行就是裝載程序員編寫好的計算程序,例如咱們的程序類名就是WordCount了。這裏我要作下糾正,雖然咱們編寫mapreduce程序只須要實現map函數和reduce函數,可是實際開發咱們要實現三個類,第三個類是爲了配置mapreduce如何運行map和reduce函數,準確的說就是構建一個mapreduce能執行的job了,例如WordCount類。

  第三行和第五行就是裝載map函數和reduce函數實現類了,這裏多了個第四行,這個是裝載Combiner類,這個類和mapreduce運行機制有關,其實本例去掉第四行也沒有關係,可是使用了第四行理論上運行效率會更好。

接下來的代碼:

job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

  這個是定義輸出的 key/value 的類型,也就是最終存儲在 hdfs 上結果文件的 key/value 的類型。

最後的代碼是:

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

  第一行就是構建輸入的數據文件,第二行是構建輸出的數據文件,最後一行若是job運行成功了,咱們的程序就會正常退出。

相關文章
相關標籤/搜索