MapReduce採用"分而治之"的思想,把對大規模數據集的操做,分發給一個主節點管理下的各個分節點共同完成,而後經過整合各個節點的中間結果,獲得最終結果。簡單地說,MapReduce就是"任務的分解與結果的彙總"。html
在Hadoop中,用於執行MapReduce任務的機器角色有兩個:一個是JobTracker;另外一個是 TaskTracker,JobTracker是用於調度工做的,TaskTracker是用於執行工做的。一個Hadoop集羣中只有一臺 JobTracker。java
在分佈式計算中,MapReduce框架負責處理了並行編程中分佈式存儲、工做調度、負載均衡、容錯均衡、容錯處理以及網絡通訊等複雜問題,把 處理過程高度抽象爲兩個函數:map和reduce,map負責把任務分解成多個任務,reduce負責把分解後多任務處理的結果彙總起來。apache
須要注意的是,用MapReduce來處理的數據集(或任務)必須具有這樣的特色:待處理的數據集能夠分解成許多小的數據集,並且每個小數據集均可以徹底並行地進行處理。編程
在Hadoop中,每一個MapReduce任務都被初始化爲一個Job,每一個Job又能夠分爲兩種階段:map階段和reduce階段。這兩個 階段分別用兩個函數表示,即map函數和reduce函數。map函數接收一個<key,value>形式的輸入,而後一樣產生一 個<key,value>形式的中間輸出,Hadoop函數接收一個如<key,(list of values)>形式的輸入,而後對這個value集合進行處理,每一個reduce產生0或1個輸出,reduce的輸出也 是<key,value>形式的。數組
MapReduce處理大數據集的過程服務器
單詞計數是最簡單也是最能體現MapReduce思想的程序之一,能夠稱爲MapReduce版"Hello World",該程序的完整代碼能夠在Hadoop安裝包的"src/examples"目錄下找到。單詞計數主要完成功能是:統計一系列文本文件中每一個 單詞出現的次數,以下圖所示。網絡
如今以"hadoop"普通用戶登陸"Master.Hadoop"服務器。app
1)建立本地示例文件 負載均衡
首先在"/home/hadoop"目錄下建立文件夾"file"。框架
接着建立兩個文本文件file1.txt和file2.txt,使file1.txt內容爲"Hello World",而file2.txt的內容爲"Hello Hadoop"。
2)在HDFS上建立輸入文件夾
3)上傳本地file中文件到集羣的input目錄下
1)在集羣上運行WordCount程序
備註:以input做爲輸入目錄,output目錄做爲輸出目錄。
已經編譯好的WordCount的Jar在"/usr/hadoop"下面,就是"hadoop-examples-1.0.0.jar",因此在下面執行命令時記得把路徑寫全了,否則會提示找不到該Jar包。
2)MapReduce執行過程顯示信息
Hadoop命令會啓動一個JVM來運行這個MapReduce程序,並自動得到Hadoop的配置,同時把類的路徑(及其依賴關係)加入到 Hadoop的庫中。以上就是Hadoop Job的運行記錄,從這裏能夠看到,這個Job被賦予了一個ID號:job_201202292213_0002,並且得知輸入文件有兩個(Total input paths to process : 2),同時還能夠了解map的輸入輸出記錄(record數及字節數),以及reduce輸入輸出記錄。好比說,在本例中,map的task數量是2 個,reduce的task數量是一個。map的輸入record數是2個,輸出record數是4個等信息。
1)查看HDFS上output目錄內容
從上圖中知道生成了三個文件,咱們的結果在"part-r-00000"中。
2)查看結果輸出文件內容
Hadoop提供了以下內容的數據類型,這些數據類型都實現了WritableComparable接口,以便用這些類型定義的數據能夠被序列化進行網絡傳輸和文件存儲,以及進行大小比較。
BooleanWritable:標準布爾型數值
ByteWritable:單字節數值
DoubleWritable:雙字節數
FloatWritable:浮點數
IntWritable:整型數
LongWritable:長整型數
Text:使用UTF8格式存儲的文本
NullWritable:當<key,value>中的key或value爲空時使用
1)源代碼程序
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;public class WordCount {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));JobClient.runJob(conf);
}
}
3)主方法Main分析
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));JobClient.runJob(conf);
}
首先講解一下Job的初始化過程。main函數調用Jobconf類來對MapReduce Job進行初始化,而後調用setJobName()方法命名這個Job。對Job進行合理的命名有助於更快地找到Job,以便在JobTracker和Tasktracker的頁面中對其進行監視。
JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );
接着設置Job輸出結果<key,value>的中key和value數據類型,由於結果是<單詞,個數>,因此 key設置爲"Text"類型,至關於Java中String類型。Value設置爲"IntWritable",至關於Java中的int類型。
conf.setOutputKeyClass(Text.class );
conf.setOutputValueClass(IntWritable.class );
而後設置Job處理的Map(拆分)、Combiner(中間結果合併)以及Reduce(合併)的相關處理類。這裏用Reduce類來進行Map產生的中間結果合併,避免給網絡數據傳輸產生壓力。
conf.setMapperClass(Map.class );
conf.setCombinerClass(Reduce.class );
conf.setReducerClass(Reduce.class );
接着就是調用setInputPath()和setOutputPath()設置輸入輸出路徑。
conf.setInputFormat(TextInputFormat.class );
conf.setOutputFormat(TextOutputFormat.class );
(1)InputFormat和InputSplit
InputSplit是Hadoop定義的用來傳送給每一個單獨的map的數據,InputSplit存儲的並非數據自己,而是一個分片長度和一個記錄數據位置的數組。生成InputSplit的方法能夠經過InputFormat()來設置。
當數據傳送給map時,map會將輸入分片傳送到InputFormat,InputFormat則調用方法getRecordReader()生成RecordReader,RecordReader再經過creatKey()、creatValue()方法建立可供map處理的<key,value>對。簡而言之,InputFormat()方法是用來生成可供map處理的<key,value>對的。
Hadoop預約義了多種方法將不一樣類型的輸入數據轉化爲map可以處理的<key,value>對,它們都繼承自InputFormat,分別是:
InputFormat
|
|---BaileyBorweinPlouffe.BbpInputFormat
|---ComposableInputFormat
|---CompositeInputFormat
|---DBInputFormat
|---DistSum.Machine.AbstractInputFormat
|---FileInputFormat
|---CombineFileInputFormat
|---KeyValueTextInputFormat
|---NLineInputFormat
|---SequenceFileInputFormat
|---TeraInputFormat
|---TextInputFormat
其中TextInputFormat是Hadoop默認的輸入方法,在TextInputFormat中,每一個文件(或其一部分)都會單獨地做爲map的輸入,而這個是繼承自FileInputFormat的。以後,每行數據都會生成一條記錄,每條記錄則表示成<key,value>形式:
value值是每行的內容,數據類型是Text。
(2)OutputFormat
每一種輸入格式都有一種輸出格式與其對應。默認的輸出格式是TextOutputFormat,這種輸出方式與輸入相似,會將每條記錄以一行的形式存入文本文件。不過,它的鍵和值能夠是任意形式的,由於程序內容會調用toString()方法將鍵和值轉換爲String類型再輸出。
3)Map類中map方法分析
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
Map類繼承自MapReduceBase,而且它實現了Mapper接口,此接口是一個規範類型,它有4種形式的參數,分別用來指定map的輸入key值類型、輸入value值類型、輸出key值類型和輸出value 值類型。在本例中,由於使用的是TextInputFormat,它的輸出key值是LongWritable類型,輸出value值是Text類型,所 以map的輸入類型爲<LongWritable,Text>。在本例中須要輸出<word,1>這樣的形式,所以輸出的key 值類型是Text,輸出的value值類型是IntWritable。
實現此接口類還須要實現map方法,map方法會具體負責對輸入進行操做,在本例中,map方法對輸入的行以空格爲單位進行切分,而後使用OutputCollect收集輸出的<word,1>。
4)Reduce類中reduce方法分析
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
Reduce類也是繼承自MapReduceBase的,須要實現Reducer接口。Reduce類以map的輸出做爲輸入,所以Reduce的輸入類型是<Text,Intwritable>。而Reduce的輸出是單詞和它的數目,所以,它的輸出類型是<Text,IntWritable>。Reduce類也要實現reduce方法,在此方法中,reduce函數將輸入的key值做爲輸出的key值,而後將得到多個value值加起來,做爲輸出的值。
1)源代碼程序
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1)Map過程
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
Map過程須要繼承org.apache.hadoop.mapreduce包中Mapper類,並重寫其 map方法。經過在map方法中添加兩句把key值和value值輸出到控制檯的代碼,能夠發現map方法中value值存儲的是文本文件中的一行(以回 車符爲行結束標記),而key值爲該行的首字母相對於文本文件的首地址的偏移量。而後StringTokenizer類將每一行拆分紅爲一個個的單詞,並 將<word,1>做爲map方法的結果輸出,其他的工做都交有MapReduce框架處理。
2)Reduce過程
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Reduce過程須要繼承org.apache.hadoop.mapreduce包中Reducer類,並重寫其reduce方法。Map過程輸出<key,values>中key爲單個單詞,而values是對應單詞的計數值所組成的列表,Map的輸出就是Reduce的輸入,因此reduce方法只要遍歷values並求和,便可獲得某個單詞的總次數。
3)執行MapReduce任務
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
在MapReduce中,由Job對象負責管理和運行一個計算任務,並經過Job的一些方法對任務的參數進行相關的設置。此處設置了使用 TokenizerMapper完成Map過程當中的處理和使用IntSumReducer完成Combine和Reduce過程當中的處理。還設置了Map 過程和Reduce過程的輸出類型:key的類型爲Text,value的類型爲IntWritable。任務的輸出和輸入路徑則由命令行參數指定,並由FileInputFormat和FileOutputFormat分別設定。完成相應任務的參數設定後,便可調用job.waitForCompletion()方法執行任務。
本節將對WordCount進行更詳細的講解。詳細執行步驟以下:
1)將文件拆分紅splits,因爲測試用的文件較小,因此每一個文件爲一個split,並將文件按行分割形 成<key,value>對,如圖4-1所示。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所佔的字符數 (Windows和Linux環境會不一樣)。
圖4-1 分割過程
2)將分割好的<key,value>對交給用戶定義的map方法進行處理,生成新的<key,value>對,如圖4-2所示。
圖4-2 執行map方法
3)獲得map方法輸出的<key,value>對後,Mapper會將它們按照key值進行排序,並執行Combine過程,將key至相同value值累加,獲得Mapper的最終輸出結果。如圖4-3所示。
圖4-3 Map端排序及Combine過程
4)Reducer先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,獲得新的<key,value>對,並做爲WordCount的輸出結果,如圖4-4所示。
圖4-4 Reduce端排序及輸出結果
Hadoop最新版本的MapReduce Release 0.20.0的API包括了一個全新的Mapreduce JAVA API,有時候也稱爲上下文對象。
新的API類型上不兼容之前的API,因此,之前的應用程序須要重寫才能使新的API發揮其做用 。
新的API和舊的API之間有下面幾個明顯的區別。
轉自:http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html