不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

學習Hadoop搞明白Shuffle的原理是很是重要的,然而相信不少人看了《Hadoop權威指南4》好幾篇,也沒有真正搞明白它真正的原理。看完這篇文章,相信會對你理解Shuffle有很大的幫助。

官方給的定義:系統執行排序、將map輸出做爲輸入傳給reducer的過程稱爲Shuffle。(看完是否是一臉懵逼)
通俗來說,就是從map產生輸出開始到reduce消化輸入的整個過程稱爲Shuffle。以下圖用黑線框出的部分:
Shuffle介紹java

圓形緩衝區介紹:apache

1.png每個map任務都會有一個圓形緩衝區。默認大小100MB(io.sort.mb屬性)閾值0.8也就是80MB(io.sort.spill.percent屬性指定) app

zh2.jpg一旦達到閾值一個後臺線程開始把內容寫到(spill)磁盤的指定目錄mapred.local.dir下的新建的一個溢出寫文件。寫入磁盤前先partition、sort、[combiner]。一個map task任務可能產生N個磁盤文件。
map task運算完以後,產生了N個文件,而後將這些文件merge合成一個文件。
若是N=2,合成的新文件寫入磁盤前只通過patition(分區)和sort(排序)過程,不會執行combiner合併(不管是否指定combiner類),以下圖所示:
2.pngide

若是N>=3,合成的新文件寫入磁盤前通過patition(分區)、sort(排序)過和combiner合併(前提是指定了combiner類),以下圖所示:
3.pngoop

思考:爲何只有當N>=3時,合成文件纔會執行combiner呢?
這是由於若是N<3時,執行combiner雖然減小了文件的大小,可是同時產生了必定的系統開銷。因爲減小的文件大小不大,權衡利弊後,肯定N<2時不在執行combiner操做。
當該map task所有執行完以後,對應的reduce task將會拷貝對應分區的數據(該過程稱爲fetch),以下圖所示:
4.png學習

其它的map task任務完成後,對應的reduce task也一樣執行fetch操做,以下圖所示:
5.pngfetch

每一個map任務的完成時間可能不一樣,所以只要有一個任務完成,reduce任務就開始複製其輸出。該階段被稱爲reduce的複製階段。reduce任務有少許複製線程,所以可以並行取得map輸出。默認值是5個線程,但這個默認值能夠經過設置mapred.reduce.parallel.copies屬性改變。
6.png優化

複製完全部map輸出後,reduce任務進入合併階段,該階段將合併map輸出,並維持其順序排序(至關於執行了sort),若是指定了combiner,在寫入磁盤前還會執行combiner操做。spa

那麼具體是如何合併的呢?
合併因子默認是10,能夠經過io.sort.factor屬性設置。合併過程是循環進行了,可能叫通過多趟合併。目標是合併最小數量的文件以便知足最後一趟的合併係數。假設有40個文件,咱們不會在四趟中每趟合併10個文件從而獲得4個文件。相反,第一趟只合並4個文件,隨後的三趟分別合併10個文件。再最後一趟中4個已合併的文件和餘下的6個(未合併的)文件合計10個文件。具體流程以下圖所示:
8.png線程

注意:這並無改變合併次數,它只是一個優化措施,目的是儘可能減小寫到磁盤的數據量,由於最後一趟老是直接合併到reduce。
看到這裏您是否理解了Shuffle的具體原理呢,若是沒有,也沒有關係,接下來咱們經過一個wordcount案例再將整個流程梳理一遍。
首先map任務的代碼以下:
`public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {

String line = ivalue.toString();
String words[] = line.split(" ");
for (String word : words) {
  context.write(new Text(word), new LongWritable(1));
}

}
}
`
9.png
在分區(分區規則:按首字母分四個區,分別爲a-i,j-q,r-z,其它)的過程當中,會將相同的單詞合併到一塊兒,將出現次數用逗號隔開,如上圖所示。注意此時尚未排序。分區代碼以下:
`package cn.geekmooc;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WCPatitioner extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {

int first_char = key.charAt(0);
if(first_char>=97&&first_char<=105){
  return 0;
}else if(first_char>=106&&first_char<=113){
  return 1;
}else if(first_char>=114&&first_char<=122){
  return 2;
}else{
  return 3;
}

}
}`
接着執行排序操做,默認排序規則是按照key的字典升序排序,固然你也能夠指定排序規則,排序後以下圖所示:
10.png

接下來執行combiner操做,將每一個單詞後續的1求和,WCCombiner類代碼以下:
`package cn.geekmooc;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,

Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
System.out.println(key.toString()+":"+values.toString());
long count = 0;
Iterator<LongWritable> iter = values.iterator();
while(iter.hasNext()){
  count += iter.next().get();
}
context.write(key, new LongWritable(count));

}
}`
combiner的結果以下圖所示
11.png

map任務執行完,產生N個spill文件,接着對N個文件進行合併,分如下兩種狀況:
1.N<3,不管是否指定combiner類,合併文件時都不會執行combiner
12.png

2.N>=3,若是指定了combiner類將執行combiner操做,以下圖:
13.png

接下來進入fetch(或copy)階段
14.png

而後在reduce端進行合併
15.png

而後執行最後一趟合併,並將結果直接傳給reduce
16.png

reduce類代碼以下:
`package cn.geekmooc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,

Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
System.out.println(key.toString()+":"+values.toString());
long count = 0;
for (LongWritable val : values) {
  count += val.get();
}
context.write(key, new LongWritable(count));

}
}`
reduce task執行後,輸出結果:
17.png

視頻講解請點擊此處

相關文章
相關標籤/搜索