MapReduce中shuffle過程

shuffle是MapReduce的核心,map和reduce的中間過程。java

Map負責過濾分發,reduce歸併整理,從map輸出到reduce輸入就是shuffle過程。apache

實現的功能

分區

決定當前key交給哪一個reduce處理網絡

默認:按照key的hash值對reduce的個數取餘進行分區app

 

分組

將相同key的value合併oop

排序

按照key對每個keyvalue進行排序,字典排序優化

過程

 

map端shuffle

spill階段:溢寫

每個map task處理的結果會進入環形緩衝區(內存100M)spa

分區

對每一條key進行分區(標上交給哪一個reduce)線程

hadoop      1       reduce0
hive        1       reduce0
spark       1       reduce1
hadoop      1       reduce0
hbase       1       reduce1
排序

按照key排序,將相同分區的數據進行分區內排序3d

hadoop      1       reduce0
hadoop      1       reduce0
hive        1       reduce0
hbase       1       reduce1
spark       1       reduce1
溢寫

當整個緩衝區達到閾值80%,開始進行溢寫code


將當前分區排序後的數據寫入磁盤變成一個文件file1
最終生成多個spill小文件

能夠在mapred-site.xml中設置內存的大小和溢寫的閾值

在mapred-site.xml中設置內存的大小
​
    <property>
​
      <name>mapreduce.task.io.sort.mb</name>
​
      <value>100</value>
​
    </property>
​
在mapred-site.xml中設置內存溢寫的閾值  
​
    <property>
​
      <name>mapreduce.task.io.sort.spill.percent</name>
​
      <value>0.8</value>
​
    </property>

 

merge:合併

將spill生成的多個小文件進行合併

排序:將相同分區的數據進行分區內排序,實現comparator比較器進行比較。最終造成一個文件。

 
file1
hadoop      1       reduce0
hadoop      1       reduce0
hive        1       reduce0
hbase       1       reduce1
spark       1       reduce1
​
file2
hadoop      1       reduce0
hadoop      1       reduce0
hive        1       reduce0
hbase       1       reduce1
spark       1       reduce1
​
end_file:
hadoop      1       reduce0
hadoop      1       reduce0
hadoop      1       reduce0
hadoop      1       reduce0
hive        1       reduce0
hive        1       reduce0
hbase       1       reduce1
hbase       1       reduce1
spark       1       reduce1
spark       1       reduce1

map task 結束,通知app master,app master通知reduce拉取數據

reduce端shuffle

map task1
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hive        1       reduce0
        hive        1       reduce0
        hbase       1       reduce1
        hbase       1       reduce1
        spark       1       reduce1
        spark       1       reduce1
map task2
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hadoop      1       reduce0
        hive        1       reduce0
        hive        1       reduce0
        hbase       1       reduce1
        hbase       1       reduce1
        spark       1       reduce1
        spark       1       reduce1

reduce啓動多個線程經過http到每臺機器上拉取屬於本身分區的數據

reduce0:
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hadoop      1       reduce0
    hive        1       reduce0
    hive        1       reduce0
    hive        1       reduce0
    hive        1       reduce0

merge:合併,將每一個map task的結果中屬於本身的分區數據進行合併

排序:對總體屬於我分區的數據進行排序

分組:對相同key的value進行合併,使用comparable完成比較。

hadoop,list<1,1,1,1,1,1,1,1>
hive,list<1,1,1,1>

  

優化

combine

在map階段提早進行一次合併。通常等同於提早執行reduce

 
job.setCombinerClass(WCReduce.class);

compress

壓縮中間結果集,減小磁盤IO以及網絡IO

壓縮配置方式

1.default:全部hadoop中默認的配置項
2.site:用於自定義配置文件,若是修改之後必須重啓生效
3.conf對象配置每一個程序的自定義配置
4.運行時經過參數實現用戶自定義配置
bin/yarn jar xx.jar -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec main_class input_path ouput_path

查看本地庫支持哪些壓縮

bin/hadoop checknative

經過conf配置對象配置壓縮

public static void main(String[] args) {
        Configuration configuration = new Configuration();
        //配置map中間結果集壓縮
        configuration.set("mapreduce.map.output.compress","true");
        configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec");
        //配置reduce結果集壓縮
        configuration.set("mapreduce.output.fileoutputformat.compress","true");
        configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec");
        try {
            int status = ToolRunner.run(configuration, new MRDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
}

 

經過自定義配置文件site-xml   

針對Map Output數據進行壓縮設置

對於MR程序來講:
提交任務的時候使用參數配置
mapreduce.map.output.compress=true
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
 
 

運行官方MapReduce案例 wordcount    可在Yarn上看到

 

相關文章
相關標籤/搜索