不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

學習Hadoop搞明白Shuffle的原理是很是重要的,然而相信不少人看了《Hadoop權威指南4》好幾遍,也沒有真正搞明白它真正的原理。看完這篇文章,相信會對你理解Shuffle有很大的幫助。java


官方給的定義:系統執行排序、將map輸出做爲輸入傳給reducer的過程稱爲Shuffle。(看完是否是一臉懵逼)
通俗來說,就是從map產生輸出開始到reduce消化輸入的整個過程稱爲Shuffle。以下圖用黑線框出的部分:
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))apache

圓形緩衝區介紹:
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))app

每個map任務都會有一個圓形緩衝區。默認大小100MB(io.sort.mb屬性)閾值0.8也就是80MB(io.sort.spill.percent屬性指定) ,ide

一旦達到閾值一個後臺線程開始把內容寫到(spill)磁盤的指定目錄mapred.local.dir下的新建的一個溢出寫文件。寫入磁盤前先partition、sort、[combiner]。一個map task任務可能產生N個磁盤文件。
map task運算完以後,產生了N個文件,而後將這些文件merge合成一個文件。
若是N < 3,合成的新文件寫入磁盤前只通過patition(分區)和sort(排序)過程,不會執行combiner合併(不管是否指定combiner類),以下圖所示:
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))oop

若是N>=3,合成的新文件寫入磁盤前通過patition(分區)、sort(排序)過和combiner合併(前提是指定了combiner類),以下圖所示:
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))學習

思考:爲何只有當N>=3時,合成文件纔會執行combiner呢?
這是由於若是N< 3時,執行combiner雖然減小了文件的大小,可是同時產生了必定的系統開銷。因爲減小的文件大小不大,權衡利弊後,肯定N< 2時不在執行combiner操做。
當該map task所有執行完以後,對應的reduce task將會拷貝對應分區的數據(該過程稱爲fetch),以下圖所示:fetch

不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))
其它的map task任務完成後,對應的reduce task也一樣執行fetch操做,以下圖所示:
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))優化

每一個map任務的完成時間可能不一樣,所以只要有一個任務完成,reduce任務就開始複製其輸出。該階段被稱爲reduce的複製階段。reduce任務有少許複製線程,所以可以並行取得map輸出。默認值是5個線程,但這個默認值能夠經過設置mapred.reduce.parallel.copies屬性改變。
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))線程

複製完全部map輸出後,reduce任務進入合併階段,該階段將合併map輸出,並維持其順序排序(至關於執行了sort),若是指定了combiner,在寫入磁盤前還會執行combiner操做。
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))3d

那麼具體是如何合併的呢?
合併因子默認是10,能夠經過io.sort.factor屬性設置。合併過程是循環進行了,可能叫通過多趟合併。目標是合併最小數量的文件以便知足最後一趟的合併係數。假設有40個文件,咱們不會在四趟中每趟合併10個文件從而獲得4個文件。相反,第一趟只合並4個文件,隨後的三趟分別合併10個文件。再最後一趟中4個已合併的文件和餘下的6個(未合併的)文件合計10個文件。具體流程以下圖所示:

不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))
注意:這並無改變合併次數,它只是一個優化措施,目的是儘可能減小寫到磁盤的數據量,由於最後一趟老是直接合併到reduce。
看到這裏您是否理解了Shuffle的具體原理呢,若是沒有,也沒有關係,接下來咱們經過一個wordcount案例再將整個流程梳理一遍。
首先map任務的代碼以下:

public class WCMapper extends Mapper< LongWritable, Text, Text, LongWritable> {
  public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
    String line = ivalue.toString();
    String words[] = line.split(" ");
    for (String word : words) {
      context.write(new Text(word), new LongWritable(1));
    }
  }
}

不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

在分區(分區規則:按首字母分四個區,分別爲a-i,j-q,r-z,其它)的過程當中,會將相同的單詞合併到一塊兒,將出現次數用逗號隔開,如上圖所示。注意此時尚未排序。分區代碼以下:

package cn.geekmooc;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WCPatitioner extends Partitioner<Text, LongWritable> {
  @Override
  public int getPartition(Text key, LongWritable value, int numPartitions) {
    int first_char = key.charAt(0);
    if(first_char>=97&&first_char<=105){
      return 0;
    }else if(first_char>=106&&first_char<=113){
      return 1;
    }else if(first_char>=114&&first_char<=122){
      return 2;
    }else{
      return 3;
    }
  }
}

接着執行排序操做,默認排序規則是按照key的字典升序排序,固然你也能夠指定排序規則,排序後以下圖所示:
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

接下來執行combiner操做,將每一個單詞後續的1求和,WCCombiner類代碼以下:

package cn.geekmooc;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
  @Override
  protected void reduce(Text key, Iterable<LongWritable> values,
      Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
    System.out.println(key.toString()+":"+values.toString());
    long count = 0;
    Iterator<LongWritable> iter = values.iterator();
    while(iter.hasNext()){
      count += iter.next().get();
    }
    context.write(key, new LongWritable(count));
  }
}

combiner的結果以下圖所示

不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))
map任務執行完,產生N個spill文件,接着對N個文件進行合併,分如下兩種狀況:
1.N < 3,不管是否指定combiner類,合併文件時都不會執行combiner
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

2.N>=3,若是指定了combiner類將執行combiner操做,以下圖:
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

接下來進入fetch(或copy)階段
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

而後在reduce端進行合併
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

而後執行最後一趟合併,並將結果直接傳給reduce
不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

reduce類代碼以下:

package cn.geekmooc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  @Override
  protected void reduce(Text key, Iterable<LongWritable> values,
      Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
    System.out.println(key.toString()+":"+values.toString());
    long count = 0;
    for (LongWritable val : values) {
      count += val.get();
    }
    context.write(key, new LongWritable(count));
  }
}

reduce task執行後,輸出結果:

不懂Hadoop心臟Shuffle的原理這一篇就夠了(含講解視頻))

看講解視頻

相關文章
相關標籤/搜索