轉https://blog.csdn.net/shujuelin/article/details/79119214java
Hadoop的框架最核心的設計就是:HDFS和MapReduce。HDFS爲海量的數據提供了存儲,MapReduce則爲海量的數據提供了計算。
HDFS是Google File System(GFS)的開源實現,MapReduce是Google MapReduce的開源實現。
HDFS和MapReduce實現是徹底分離的,並非沒有HDFS就不能MapReduce運算。
本文主要參考瞭如下三篇博客學習整理而成。
一、Hadoop示例程序WordCount詳解及實例
二、hadoop 學習筆記:mapreduce框架詳解
三、hadoop示例程序wordcount分析
4,初學hadoop之圖解MapReduce與WordCount示例分析
一、MapReduce總體流程
最簡單的MapReduce應用程序至少包含 3 個部分:一個 Map 函數、一個 Reduce 函數和一個 main 函數。在運行一個mapreduce計算任務時候,任務過程被分爲兩個階段:map階段和reduce階段,每一個階段都是用鍵值對(key/value)做爲輸入(input)和輸出(output)。main 函數將做業控制和文件輸入/輸出結合起來。
並行讀取文本中的內容,而後進行MapReduce操做。
Map過程:並行讀取文本,對讀取的單詞進行map操做,每一個詞都以<key,value>形式生成。
個人理解:
一個有三行文本的文件進行MapReduce操做。
讀取第一行Hello World Bye World ,分割單詞造成Map。
<Hello,1> <World,1> <Bye,1> <World,1>
讀取第二行Hello Hadoop Bye Hadoop ,分割單詞造成Map。
<Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1>
讀取第三行Bye Hadoop Hello Hadoop,分割單詞造成Map。
<Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>
Reduce操做是對map的結果進行排序,合併,最後得出詞頻。
個人理解:
通過進一步處理(combiner),將造成的Map根據相同的key組合成value數組。
<Bye,1,1,1> <Hadoop,1,1,1,1> <Hello,1,1,1> <World,1,1>
循環執行Reduce(K,V[]),分別統計每一個單詞出現的次數。
<Bye,3> <Hadoop,4> <Hello,3> <World,2>
回到目錄
二、WordCount源碼
複製代碼
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
*
* 描述:WordCount explains by York
* @author Hadoop Dev Group
*/
publicclass WordCount {
/**
* 創建Mapper類TokenizerMapper繼承自泛型類Mapper
* Mapper類:實現了Map功能基類
* Mapper接口:
* WritableComparable接口:實現WritableComparable的類能夠相互比較。全部被用做key的類應該實現此接口。
* Reporter 則可用於報告整個應用的運行進度,本例中未使用。
*
*/
publicstaticclass TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
/**
* IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口,
* 都可以被串行化從而便於在分佈式環境中進行數據交換,你能夠將它們分別視爲int,String 的替代品。
* 聲明one常量和word用於存放單詞的變量
*/
privatefinalstatic IntWritable one =new IntWritable(1);
private Text word =new Text();
/**
* Mapper中的map方法:
* void map(K1 key, V1 value, Context context)
* 映射一個單個的輸入k/v對到一箇中間的k/v對
* 輸出對不須要和輸入對是相同的類型,輸入對能夠映射到0個或多個輸出對。
* Context:收集Mapper輸出的<k,v>對。
* Context的write(k, v)方法:增長一個(k,v)對到context
* 程序員主要編寫Map和Reduce函數.這個Map函數使用StringTokenizer函數對字符串進行分隔,經過write方法把單詞存入word中
* write方法存入(單詞,1)這樣的二元組到context中
*/
publicvoid map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr =new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
publicstaticclass IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result =new IntWritable();
/**
* Reducer類中的reduce方法:
* void reduce(Text key, Iterable<IntWritable> values, Context context)
* 中k/v來自於map函數中的context,可能通過了進一步處理(combiner),一樣經過context輸出
*/
publicvoid reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum =0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
publicstaticvoid main(String[] args) throws Exception {
/**
* Configuration:map/reduce的j配置類,向hadoop框架描述map-reduce執行的工做
*/
Configuration conf =new Configuration();
String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length !=2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job =new Job(conf, "word count"); //設置一個用戶定義的job名稱
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class); //爲job設置Mapper類
job.setCombinerClass(IntSumReducer.class); //爲job設置Combiner類
job.setReducerClass(IntSumReducer.class); //爲job設置Reducer類
job.setOutputKeyClass(Text.class); //爲job的輸出數據設置Key類
job.setOutputValueClass(IntWritable.class); //爲job輸出設置value類
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //爲job設置輸入路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//爲job設置輸出路徑
System.exit(job.waitForCompletion(true) ?0 : 1); //運行job
}
}
複製代碼
回到目錄
三、WordCount逐行解析
對於map函數的方法。
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
這裏有三個參數,前面兩個Object key, Text value就是輸入的key和value,第三個參數Context context這是能夠記錄輸入的key和value,例如:context.write(word, one);此外context還會記錄map運算的狀態。
對於reduce函數的方法。
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}
reduce函數的輸入也是一個key/value的形式,不過它的value是一個迭代器的形式Iterable<IntWritable> values,也就是說reduce的輸入是一個key對應一組的值的value,reduce也有context和map的context做用一致。
至於計算的邏輯則須要程序員編碼實現。
對於main函數的調用。
首先是:
Configuration conf = new Configuration();
運行MapReduce程序前都要初始化Configuration,該類主要是讀取MapReduce系統配置信息,這些信息包括hdfs還有MapReduce,也就是安裝hadoop時候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件裏的信息,有些童鞋不理解爲啥要這麼作,這個是沒有深刻思考MapReduce計算框架形成,咱們程序員開發MapReduce時候只是在填空,在map函數和reduce函數裏編寫實際進行的業務邏輯,其它的工做都是交給MapReduce框架本身操做的,可是至少咱們要告訴它怎麼操做啊,好比hdfs在哪裏,MapReduce的jobstracker在哪裏,而這些信息就在conf包下的配置文件裏。
接下來的代碼是:
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
If的語句好理解,就是運行WordCount程序時候必定是兩個參數,若是不是就會報錯退出。至於第一句裏的GenericOptionsParser類,它是用來解釋經常使用hadoop命令,並根據須要爲Configuration對象設置相應的值,其實平時開發裏咱們不太經常使用它,而是讓類實現Tool接口,而後再main函數裏使用ToolRunner運行程序,而ToolRunner內部會調用GenericOptionsParser。
接下來的代碼是:
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
第一行就是在構建一個job,在mapreduce框架裏一個mapreduce任務也叫mapreduce做業也叫作一個mapreduce的job,而具體的map和reduce運算就是task了,這裏咱們構建一個job,構建時候有兩個參數,一個是conf這個就不累述了,一個是這個job的名稱。
第二行就是裝載程序員編寫好的計算程序,例如咱們的程序類名就是WordCount了。這裏我要作下糾正,雖然咱們編寫mapreduce程序只須要實現map函數和reduce函數,可是實際開發咱們要實現三個類,第三個類是爲了配置mapreduce如何運行map和reduce函數,準確的說就是構建一個mapreduce能執行的job了,例如WordCount類。
第三行和第五行就是裝載map函數和reduce函數實現類了,這裏多了個第四行,這個是裝載Combiner類,這個類和mapreduce運行機制有關,其實本例去掉第四行也沒有關係,可是使用了第四行理論上運行效率會更好。
接下來的代碼:
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
這個是定義輸出的key/value的類型,也就是最終存儲在hdfs上結果文件的key/value的類型。
最後的代碼是:
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
第一行就是構建輸入的數據文件,第二行是構建輸出的數據文件,最後一行若是job運行成功了,咱們的程序就會正常退出。
Mapreduce運行機制
MapReduce主要包括JobClient, JobTracker, TaskTracker , HDFS 4個獨立的部分
談mapreduce運行機制,能夠從不少不一樣的角度來描述,好比說從mapreduce運行流程來說解,也能夠從計算模型的邏輯流程來進行講解,也許有些深刻理解了mapreduce運行機制還會從更好的角度來描述,可是將mapreduce運行機制有些東西是避免不了的,就是一個個參入的實例對象,一個就是計算模型的邏輯定義階段,我這裏講解不從什麼流程出發,就從這些一個個牽涉的對象,不論是物理實體仍是邏輯實體。
首先講講物理實體,參入mapreduce做業執行涉及4個獨立的實體:
客戶端(client):編寫mapreduce程序,配置做業,提交做業,這就是程序員完成的工做;
JobTracker:初始化做業,分配做業,與TaskTracker通訊,協調整個做業的執行;
TaskTracker:保持與JobTracker的通訊,在分配的數據片斷上執行Map或Reduce任務,TaskTracker和JobTracker的不一樣有個很重要的方面,就是在執行任務時候TaskTracker能夠有n多個,JobTracker則只會有一個(JobTracker只能有一個就和hdfs裏namenode同樣存在單點故障,我會在後面的mapreduce的相關問題裏講到這個問題的)
Hdfs:保存做業的數據、配置信息等等,最後的結果也是保存在hdfs上面
那麼mapreduce究竟是如何運行的呢?
首先是客戶端要編寫好mapreduce程序,配置好mapreduce的做業也就是job,接下來就是提交job了,提交job是提交到JobTracker上的,這個時候JobTracker就會構建這個job,具體就是分配一個新的job任務的ID值,接下來它會作檢查操做,這個檢查就是肯定輸出目錄是否存在,若是存在那麼job就不能正常運行下去,JobTracker會拋出錯誤給客戶端,接下來還要檢查輸入目錄是否存在,若是不存在一樣拋出錯誤,若是存在JobTracker會根據輸入計算輸入分片(Input Split),若是分片計算不出來也會拋出錯誤,至於輸入分片我後面會作講解的,這些都作好了JobTracker就會配置Job須要的資源了。分配好資源後,JobTracker就會初始化做業,初始化主要作的是將Job放入一個內部的隊列,讓配置好的做業調度器能調度到這個做業,做業調度器會初始化這個job,初始化就是建立一個正在運行的job對象(封裝任務和記錄信息),以便JobTracker跟蹤job的狀態和進程。
初始化完畢後,做業調度器會獲取輸入分片信息(input split),每一個分片建立一個map任務。接下來就是任務分配了,這個時候tasktracker會運行一個簡單的循環機制按期發送心跳給jobtracker,心跳間隔是5秒,程序員能夠配置這個時間,心跳就是jobtracker和tasktracker溝通的橋樑,經過心跳,jobtracker能夠監控tasktracker是否存活,也能夠獲取tasktracker處理的狀態和問題,同時tasktracker也能夠經過心跳裏的返回值獲取jobtracker給它的操做指令。任務分配好後就是執行任務了。在任務執行時候jobtracker能夠經過心跳機制監控tasktracker的狀態和進度,同時也能計算出整個job的狀態和進度,而tasktracker也能夠本地監控本身的狀態和進度。當jobtracker得到了最後一個完成指定任務的tasktracker操做成功的通知時候,jobtracker會把整個job狀態置爲成功,而後當客戶端查詢job運行狀態時候(注意:這個是異步操做),客戶端會查到job完成的通知的。若是job中途失敗,mapreduce也會有相應機制處理,通常而言若是不是程序員程序自己有bug,mapreduce錯誤處理機制都能保證提交的job能正常完成。
下面我從邏輯實體的角度講解mapreduce運行機制,這些按照時間順序包括:輸入分片(input split)、map階段、combiner階段、shuffle階段和reduce階段。
1. 輸入分片(input split):在進行map計算以前,mapreduce會根據輸入文件計算輸入分片(input split),每一個輸入分片(input split)針對一個map任務,輸入分片(input split)存儲的並不是數據自己,而是一個分片長度和一個記錄數據的位置的數組,輸入分片(input split)每每和hdfs的block(塊)關係很密切,假如咱們設定hdfs的塊的大小是64mb,若是咱們輸入有三個文件,大小分別是3mb、65mb和127mb,那麼mapreduce會把3mb文件分爲一個輸入分片(input split),65mb則是兩個輸入分片(input split)而127mb也是兩個輸入分片(input split),換句話說咱們若是在map計算前作輸入分片調整,例如合併小文件,那麼就會有5個map任務將執行,並且每一個map執行的數據大小不均,這個也是mapreduce優化計算的一個關鍵點。
2. map階段:就是程序員編寫好的map函數了,所以map函數效率相對好控制,並且通常map操做都是本地化操做也就是在數據存儲節點上進行;
3. combiner階段:combiner階段是程序員能夠選擇的,combiner其實也是一種reduce操做,所以咱們看見WordCount類裏是用reduce進行加載的。Combiner是一個本地化的reduce操做,它是map運算的後續操做,主要是在map計算出中間文件前作一個簡單的合併重複key值的操做,例如咱們對文件裏的單詞頻率作統計,map計算時候若是碰到一個hadoop的單詞就會記錄爲1,可是這篇文章裏hadoop可能會出現n屢次,那麼map輸出文件冗餘就會不少,所以在reduce計算前對相同的key作一個合併操做,那麼文件會變小,這樣就提升了寬帶的傳輸效率,畢竟hadoop計算力寬帶資源每每是計算的瓶頸也是最爲寶貴的資源,可是combiner操做是有風險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終輸入,例如:若是計算只是求總數,最大值,最小值可使用combiner,可是作平均值計算使用combiner的話,最終的reduce計算結果就會出錯。
4. shuffle階段:將map的輸出做爲reduce的輸入的過程就是shuffle了,這個是mapreduce優化的重點地方。這裏我不講怎麼優化shuffle階段,講講shuffle階段的原理,由於大部分的書籍裏都沒講清楚shuffle階段。Shuffle一開始就是map階段作輸出操做,通常mapreduce計算的都是海量數據,map輸出時候不可能把全部文件都放到內存操做,所以map寫入磁盤的過程十分的複雜,更況且map輸出時候要對結果進行排序,內存開銷是很大的,map在作輸出時候會在內存裏開啓一個環形內存緩衝區,這個緩衝區專門用來輸出的,默認大小是100mb,而且在配置文件裏爲這個緩衝區設定了一個閥值,默認是0.80(這個大小和閥值都是能夠在配置文件裏進行配置的),同時map還會爲輸出操做啓動一個守護線程,若是緩衝區的內存達到了閥值的80%時候,這個守護線程就會把內容寫到磁盤上,這個過程叫spill,另外的20%內存能夠繼續寫入要寫進磁盤的數據,寫入磁盤和寫入內存操做是互不干擾的,若是緩存區被撐滿了,那麼map就會阻塞寫入內存的操做,讓寫入磁盤操做完成後再繼續執行寫入內存操做,前面我講到寫入磁盤前會有個排序操做,這個是在寫入磁盤操做時候進行,不是在寫入內存時候進行的,若是咱們定義了combiner函數,那麼排序前還會執行combiner操做。
每次spill操做也就是寫入磁盤操做時候就會寫一個溢出文件,也就是說在作map輸出有幾回spill就會產生多少個溢出文件,等map輸出所有作完後,map會合並這些輸出文件。這個過程裏還會有一個Partitioner操做,對於這個操做不少人都很迷糊,其實Partitioner操做和map階段的輸入分片(Input split)很像,一個Partitioner對應一個reduce做業,若是咱們mapreduce操做只有一個reduce操做,那麼Partitioner就只有一個,若是咱們有多個reduce操做,那麼Partitioner對應的就會有多個,Partitioner所以就是reduce的輸入分片,這個程序員能夠編程控制,主要是根據實際key和value的值,根據實際業務類型或者爲了更好的reduce負載均衡要求進行,這是提升reduce效率的一個關鍵所在。到了reduce階段就是合併map輸出文件了,Partitioner會找到對應的map輸出文件,而後進行復制操做,複製操做時reduce會開啓幾個複製線程,這些線程默認個數是5個,程序員也能夠在配置文件更改複製線程的個數,這個複製過程和map寫入磁盤過程相似,也有閥值和內存大小,閥值同樣能夠在配置文件裏配置,而內存大小是直接使用reduce的tasktracker的內存大小,複製時候reduce還會進行排序操做和合並文件操做,這些操做完了就會進行reduce計算了。
5. reduce階段:和map函數同樣也是程序員編寫的,最終結果是存儲在hdfs上的。
node