shuffle是MapReduce的核心,map和reduce的中間過程。java
Map負責過濾分發,reduce歸併整理,從map輸出到reduce輸入就是shuffle過程。apache
決定當前key交給哪一個reduce處理網絡
默認:按照key的hash值對reduce的個數取餘進行分區app
將相同key的value合併oop
按照key對每個keyvalue進行排序,字典排序優化
每個map task處理的結果會進入環形緩衝區(內存100M)spa
對每一條key進行分區(標上交給哪一個reduce)線程
hadoop 1 reduce0 hive 1 reduce0 spark 1 reduce1 hadoop 1 reduce0 hbase 1 reduce1
按照key排序,將相同分區的數據進行分區內排序3d
hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hbase 1 reduce1 spark 1 reduce1
當整個緩衝區達到閾值80%,開始進行溢寫code
將當前分區排序後的數據寫入磁盤變成一個文件file1
最終生成多個spill小文件
能夠在mapred-site.xml中設置內存的大小和溢寫的閾值
在mapred-site.xml中設置內存的大小 <property> <name>mapreduce.task.io.sort.mb</name> <value>100</value> </property> 在mapred-site.xml中設置內存溢寫的閾值 <property> <name>mapreduce.task.io.sort.spill.percent</name> <value>0.8</value> </property>
將spill生成的多個小文件進行合併
排序:將相同分區的數據進行分區內排序,實現comparator比較器進行比較。最終造成一個文件。
file1 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hbase 1 reduce1 spark 1 reduce1 file2 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hbase 1 reduce1 spark 1 reduce1 end_file: hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hive 1 reduce0 hbase 1 reduce1 hbase 1 reduce1 spark 1 reduce1 spark 1 reduce1
map task 結束,通知app master,app master通知reduce拉取數據
map task1 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hive 1 reduce0 hbase 1 reduce1 hbase 1 reduce1 spark 1 reduce1 spark 1 reduce1 map task2 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hive 1 reduce0 hbase 1 reduce1 hbase 1 reduce1 spark 1 reduce1 spark 1 reduce1
reduce啓動多個線程經過http到每臺機器上拉取屬於本身分區的數據
reduce0: hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hadoop 1 reduce0 hive 1 reduce0 hive 1 reduce0 hive 1 reduce0 hive 1 reduce0
merge:合併,將每一個map task的結果中屬於本身的分區數據進行合併
排序:對總體屬於我分區的數據進行排序
分組:對相同key的value進行合併,使用comparable完成比較。
hadoop,list<1,1,1,1,1,1,1,1> hive,list<1,1,1,1>
在map階段提早進行一次合併。通常等同於提早執行reduce
job.setCombinerClass(WCReduce.class);
壓縮中間結果集,減小磁盤IO以及網絡IO
1.default:全部hadoop中默認的配置項 2.site:用於自定義配置文件,若是修改之後必須重啓生效 3.conf對象配置每一個程序的自定義配置 4.運行時經過參數實現用戶自定義配置 bin/yarn jar xx.jar -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec main_class input_path ouput_path
bin/hadoop checknative
public static void main(String[] args) { Configuration configuration = new Configuration(); //配置map中間結果集壓縮 configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec"); //配置reduce結果集壓縮 configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.Lz4Codec"); try { int status = ToolRunner.run(configuration, new MRDriver(), args); System.exit(status); } catch (Exception e) { e.printStackTrace(); } }
經過自定義配置文件site-xml
針對Map Output數據進行壓縮設置
mapreduce.map.output.compress=true mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
運行官方MapReduce案例 wordcount 可在Yarn上看到