逐行分析Hadoop的HelloWorld

學寫代碼的時候,咱們老是先從helloworld開始寫起,那麼學習Hadoop,咱們也必不可少的從helloworld開始,那麼WordCount做爲經典的Hadoop程序,能夠做爲咱們庖丁解牛的材料,進而從代碼的角度學習一下mapreduce的實現過程。下面咱們就開始一步步的探索。java

先從源碼看起,再一步步剖析apache

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("mapred.job.tracker", "172.16.10.15:9001");//本身額外加的代碼
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

你們能夠看到整個源代碼分爲三個部分:網絡

1. Mapapp

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) 
throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

a) 定義一個本身的Map過程,TokenizerMapper 這個類名本身設定,這個類須要繼承org.apache.hadoop.mapreduce包中的Mapper類,四個參數分別表示輸入鍵key的參數類型,輸入值value的參數類型,輸出鍵key的參數類型,輸出值value的參數類型 值得注意的是Hadoop自己提供了一套可優化的網絡序列化傳輸的基本類型,而不是用java內嵌的類型。這些類型都是在org.apache.hadoop.io包中。其中LongWritable類型至關於Long類型,Text類型至關於String類型,IntWritable至關於Integer類型。
b) map方法中參數value是指文本文件中的一行,參數key是爲該行首字母相對於文本文件首地址的偏移量
c) StringTokenizer類是一個用來分隔String的應用類,相似於split。框架

//它的構造函數有三種:
public StringTokenizer(String str)
public StringTokenizer(String str,String delim)
public StringTokenizer(String str,String delim,boolean returnDelims)
//其中第一個參數爲要分隔的String,第二個參數爲分隔字符集合,第三個參數爲分隔符是否做爲標記返回,若是不指定分隔符,默認是'\t\n\r\f'
//它的方法主要有三種:
public boolean hasMoreTokens()//返回是否還有分隔符
public String nextToken()//返回從當前位置到下一個分隔符的字符串
public int countTokens()//返回nextToken方法被調用的次數

d) 通過StringTolenizer 處理以後會獲得一個個 < word,1 > 這樣的鍵值對,放在context裏,Context用於輸出內容的寫入,讀起來有點兒繞口,本身理解一下。eclipse

2. Reduce函數

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	private IntWritable result = new IntWritable();

	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {
			sum += val.get();
		}
		result.set(sum);
		context.write(key, result);
	}
}

a) 同mapper 過程同樣,Reduce過程須要繼承org.apache.hadoop.mapreduce包中Reducer類,並重寫其reduce方法。 
b) reduce方法中輸入參數key 指單個單詞,values 指對應單詞的計數值的列表 
c) reduce 方法的目的就是對列表的值進行加和處理 
d) 輸出的是< key,value>,key 指單個單詞,value 指對應單詞的計數值的列表的值的總和。oop

3. Main學習

public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();
	conf.set("mapred.job.tracker", "172.16.10.15:9001");//本身額外加的代碼
	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	if (otherArgs.length != 2) {
		System.err.println("Usage: wordcount <in> <out>");
		System.exit(2);
	}
	Job job = new Job(conf, "word count");
	job.setJarByClass(WordCount.class);
	job.setMapperClass(TokenizerMapper.class);
	job.setCombinerClass(IntSumReducer.class);
	job.setReducerClass(IntSumReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	System.exit(job.waitForCompletion(true) ? 0 : 1);
}

a) Configuration conf = new Configuration(); 默認狀況下,Configuration開始實例化的時候,會從Hadoop的配置文件裏讀取參數。 
b) conf.set(「mapred.job.tracker」, 「172.16.10.15:9001」);設置這句代碼是因爲咱們要把使用eclipse提交做業到Hadoop集羣,因此手動添加Job運行地址。如果直接在Hadoop 集羣進行運行,不用加這句代碼。 並且你能夠看到只要前三句使用了這個代碼,因此這三句之後的代碼纔是全部Hadoop例子中都會包含的。
c) 接下來這一句也是讀取參數,這裏是從命令行參數裏讀取參數。 
d) Job job = new Job(conf, 「word count」); 在MapReduce處理過程當中,由Job對象負責管理和運行一個計算任務,而後經過Job的若干方法來對任務的參數進行設置。」word count」是Job的名字,(固然了,根據全部java語言規範規定的那樣,你也能夠用測試

Job job = new Job();
job.setJobName("Name");

的形式作聲明)。 
e) job.setJarByClass(WordCount.class);是根據WordCount類的位置設置Jar文件 。

爲何要這麼作?由於咱們在Hadoop集羣上運行這個做業時候,要把代碼打包成一個JAR文件,用以在集羣上發佈這個文件。Hadoop利用這個傳遞進去的類來查找包含它的JAR文件。
f) job.setMapperClass(TokenizerMapper.class);設置Mapper 
g) job.setCombinerClass(IntSumReducer.class);設置Combiner,這裏先使用Reduce類來進行Mapper 的中間結果的合併,可以減輕網絡傳輸的壓力。 
h) job.setReducerClass(IntSumReducer.class);設置Reduce 
i) job.setOutputKeyClass(Text.class);和 job.setOutputValueClass(IntWritable.class);分別是設置輸出鍵的類型和設置輸出值的類型 
j) FileInputFormat.addInputPath(job, new Path(otherArgs[0]));設置輸入文件,它是otherArgs第一個參數 
k) FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));設置輸出文件,將輸出結果寫入這個文件裏,它是otherArgs第二個參數 。

注意:在運行做業前這個輸出目錄不該該存在,不然Hadoop會報錯並拒絕運行該做業。這種預防措施的目的是防止數據丟失(若是長時間運行的數據結果被意外覆蓋,確定是很是惱人的)
l) System.exit(job.waitForCompletion(true) ? 0 : 1);job執行,等待執行結果

4. 各個包的功能

到此爲止,三大部分就分析完畢,而後再來看看引入的有哪些類:

a) package org.apache.hadoop.examples;Java 提供包機制管理代碼,關鍵詞是package, 包名字能夠本身定,但不能重複。一般爲了包的惟一性,推薦使用公司域名的逆序做爲包,因而有了上面例子中的‘org.apache.hadoop’這樣的包名。 
b) import java.io.IOException; 凡是以java開頭的包,在JDK1.7的API裏能夠找到類的資料。這裏是從java.io中引入IOException,是一個輸入輸出異常類。 
c) import java.util.StringTokenizer;這是從java.util包中引入的StringTokenizer類,是一個解析文本的類。具體用法上文中已提過了。 
d) import org.apache.hadoop.conf.Configuration;凡是以org.apache.hadoop開頭的包,在Hadoop1.2.1 的API文檔能夠找到類的資料。這裏是從hadoop的conf包中引入Configuration類,它是一個讀寫和保存配置信息的類。 
e) import org.apache.hadoop.fs.Path;  Path類保存文件或者目錄的路徑字符串 
f) import org.apache.hadoop.io.IntWritable;  IntWritable是一個以類表示的可序化的整數。在java中,要表示一個整數,可使用int類型,也可使用integer類型,integer封裝了int類型,且integer類是可序化的。但Hadoop認爲integer的可序化不合適,因而實現了IntWritable。 
g) import org.apache.hadoop.io.Text;  從io包中引入Text類,是一個存儲字符串的可比較可序化的類。 
h) import org.apache.hadoop.mapreduce.Job;  引入Job類,Hadoop中每一個須要執行的任務是一個Job,這個Job負責參數配置、設置MapReduce細節、提交到Hadoop集羣、執行控制等操做。 
i) import org.apache.hadoop.mapreduce.Mapper;引入Mapper類,負責MapReduce中的Map過程。 
j) import org.apache.hadoop.mapreduce.Reducer;引入Reduce類,負責MapReduce中的Reduce過程。 
k) import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;引入FileInputFormat類,主要功能是將文件進行切片。 
l) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;FileOutputFormat類是將輸出結果寫入文件。 
m) import org.apache.hadoop.util.GenericOptionsParser;這個類負責解析命令行參數。

 

從代碼的功能上,咱們已經對map reduce有了一個清晰的認識,那麼wordcount程序具體是怎麼執行的呢?

將文件file1.txt,file2.txt 上傳到hdfs中的hdfsinput1文件夾裏(上傳的方式能夠經過eclipse客戶端,也能夠經過Hadoop命令行),而後在eclipse上編寫wordcount.java文件(也便是第一部分分析的源碼) 
這裏寫圖片描述

因爲測試用的文件較小,因此每一個文件爲一個split,並將文件按行分割造成< key,value>,這一步由MapReduce框架自動完成,其中key值爲該行首字母相對於文本文件首地址的偏移量。 
這裏寫圖片描述

將分割好的< key,value>對交給本身定義的map方法,輸出新的< key,value>對。 
這裏寫圖片描述

獲得map方法輸出的< key,value>對後,進行Combine操做。這裏Combine 執行的是Reduce的代碼。 
這裏寫圖片描述

一樣,在Reduce過程當中先對輸入的數據進行排序,再交由自定義的reduce方法進行處理,獲得新的< key,value>對,並做爲WordCount的輸出結果,輸出結果存放在第一張圖的lxnoutputssss文件夾下的part-r-00000裏。 
這裏寫圖片描述

相關文章
相關標籤/搜索