ReduceTask 的並行度一樣影響整個 job 的執行併發度和執行效率,但與 MapTask 的併發數由切片數決定不一樣,ReduceTask 數量的決定是能夠直接手動設置:java
// 默認值是1,手動設置爲4 job.setNumReduceTasks(4);
(1) 若是數據分佈不均勻,就有可能在 Reduce 階段產生數據傾斜;
(2) ReduceTask 數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有 1 個 ReduceTask;
(3) 具體多少個 ReduceTask,須要根據集羣性能而定;
(4) 若是分區數不是 1,可是 ReduceTask 爲 1,是否執行分區過程。答案是:不執行分區過程。由於在 MapTask 的源碼中,執行分區的前提是先判斷 ReduceNum 個數是否大於 1。不大於 1 確定不執行。算法
(1) 實驗環境:1 個 master 節點,16 個 slave 節點: CPU:8GHZ , 內存: 2G,MapTask 數量爲 16,測試數據量爲 1G;
(2) 實驗結論:apache
Reduce task服務器 |
1網絡 |
5併發 |
10app |
15框架 |
16分佈式 |
20ide |
25 |
30 |
45 |
60 |
總時間 |
892 |
146 |
110 |
92 |
88 |
100 |
128 |
101 |
145 |
104 |
(1) Copy 階段:ReduceTask 從各個 MapTask 上遠程拷貝一片數據,並針對某一片數據,若是其大小超過必定閾值,則寫到磁盤上,不然直接放到內存中;
(2) Merge 階段:在遠程拷貝數據的同時,ReduceTask 啓動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多;
(3) Sort 階段:按照 MapReduce 語義,用戶編寫 reduce() 函數輸入數據是按 key 進行彙集的一組數據。爲了將 key 相同的數據聚在一塊兒,Hadoop 採用了基於排序的策略。因爲各個 MapTask 已經實現對本身的處理結果進行了局部排序,所以,ReduceTask 只需對全部數據進行一次歸併排序便可;
(4) Reduce 階段:reduce() 函數將計算結果寫到 HDFS 上。
要在一個 mapreduce 程序中根據數據的不一樣輸出兩類結果到不一樣目錄,這類靈活的輸出需求能夠經過自定義 outputformat 來實現:
(1) 自定義 outputformat;
(2) 改寫 recordwriter,具體改寫輸出數據的方法 write();
使用自定義 OutputFormat 實現過濾日誌及自定義日誌輸出路徑:
過濾輸入的 log 日誌中是否包含 bigdata
(1)包含 bigdata 的日誌輸出到 e:/bigdata.log
(2)不包含 bigdata 的日誌輸出到 e:/other.log
(1) 自定義一個 outputformat
package com.test.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // 建立一個RecordWriter return new FilterRecordWriter(job); } }
(2) 具體的寫數據 RecordWriter
package com.test.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream bigdataOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 獲取文件系統 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 建立輸出文件路徑 Path bigdataPath = new Path("e:/bigdata.log"); Path otherPath = new Path("e:/other.log"); // 3 建立輸出流 bigdataOut = fs.create(bigdataPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判斷是否包含「bigdata」輸出到不一樣文件 if (key.toString().contains("bigdata")) { bigdataOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 關閉資源 if (bigdataOut!= null) { bigdataOut.close(); } if (otherOut != null) { otherOut.close(); } } }
(3) 編寫 FilterMapper
package com.test.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); k.set(line); // 3 寫出 context.write(k, NullWritable.get()); } }
(4) 編寫 FilterReducer
package com.test.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { String k = key.toString(); k = k + "\r\n"; context.write(new Text(k), NullWritable.get()); } }
(5) 編寫 FilterDriver
package com.test.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要將自定義的輸出格式組件設置到job中 job.setOutputFormatClass(FilterOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 雖然咱們自定義了outputformat,可是由於咱們的outputformat繼承自fileoutputformat // 而fileoutputformat要輸出一個_SUCCESS文件,因此,在這還得指定一個輸出目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
壓縮技術可以有效減小底層存儲系統 (HDFS) 讀寫字節數。壓縮提升了網絡帶寬和磁盤空間的效率。在 Hadood 下,尤爲是數據規模很大和工做負載密集的狀況下,使用數據壓縮顯得很是重要。在這種狀況下, I/O 操做和網絡數據傳輸要花大量的時間。還有,Shuffle 與 Merge 過程一樣也面臨着巨大的 I/O 壓力。
鑑於磁盤 I/O 和網絡帶寬是 Hadoop 的寶貴資源,數據壓縮對於節省資源、最小化磁盤 I/O 和網絡傳輸很是有幫助。不過,儘管壓縮與解壓操做的 CPU 開銷不高,其性能的提高和資源的節省並不是沒有代價。
若是磁盤 I/O 和網絡帶寬影響了 MapReduce 做業性能,在任意 MapReduce 階段啓用壓縮均可以改善端到端處理時間並減小 I/O 和網絡流量。
壓縮 MapReduce 的一種優化策略:經過壓縮編碼對 Mapper 或者 Reducer 的輸出進行壓縮,以減小磁盤 IO,提升MR程序運行速度(但相應增長了 CPU 運算負擔)
注意:壓縮特性運用得當能提升性能,但運用不當也可能下降性能
基本原則:
(1) 運算密集型的 job,少用壓縮
(2) IO 密集型的 job,多用壓縮
壓縮格式 |
工具 |
算法 |
文件擴展名 |
是否可切分 |
DEFAULT |
無 |
DEFAULT |
.deflate |
否 |
Gzip |
gzip |
DEFAULT |
.gz |
否 |
bzip2 |
bzip2 |
bzip2 |
.bz2 |
是 |
LZO |
lzop |
LZO |
.lzo |
否 |
LZ4 |
無 |
LZ4 |
.lz4 |
否 |
Snappy |
無 |
Snappy |
.snappy |
否 |
爲了支持多種壓縮/解壓縮算法,Hadoop 引入了編碼/解碼類,以下表所示:
壓縮格式 |
對應的編碼/解碼類 |
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip2 |
org.apache.hadoop.io.compress.BZip2Codec |
LZO |
com.hadoop.compression.lzo.LzopCodec |
LZ4 |
org.apache.hadoop.io.compress.Lz4Codec |
Snappy |
org.apache.hadoop.io.compress.SnappyCodec |
壓縮性能的比較:
壓縮算法 |
原始文件大小 |
壓縮文件大小 |
壓縮速度 |
解壓速度 |
gzip |
8.3GB |
1.8GB |
17.5MB/s |
58MB/s |
bzip2 |
8.3GB |
1.1GB |
2.4MB/s |
9.5MB/s |
LZO-bset |
8.3GB |
2GB |
4MB/s |
60.6MB/s |
LZO |
8.3GB |
2.9GB |
49.3MB/s |
74.6MB/s |
(1) 輸入壓縮:
在有大量數據並計劃重複處理的狀況下,應該考慮對輸入進行壓縮。然而,你無須顯示指定使用的編解碼方式。Hadoop 自動檢查文件擴展名,若是擴展名可以匹配,就會用恰當的編解碼方式對文件進行壓縮和解壓。不然,Hadoop 就不會使用任何編解碼。
(2) 壓縮 mapper 輸出:
當 map 任務輸出的中間數據量很大時,應考慮在此階段採用壓縮技術。這能顯著改善內部數據 Shuffle 過程,而 Shuffle 過程在 Hadoop 處理過程當中是資源消耗最多的環節。若是發現數據量大形成網絡傳輸緩慢,應該考慮使用壓縮技術。可用於壓縮 mapper 輸出的快速編解碼包括 LZO、LZ4 或者 Snappy。
注:LZO 是供 Hadoop 壓縮數據用的通用壓縮編解碼。其設計目標是達到與硬盤讀取速度至關的壓縮速度,所以速度是優先考慮的因素,而不是壓縮率。與 gzip 編解碼相比,它的壓縮速度是 gzip 的 5 倍,而解壓速度是 gzip 的 2 倍。同一個文件用 LZO 壓縮後比用 gzip 壓縮後大 50%,但比壓縮前小 25%~50%。這對改善性能很是有利,map 階段完成時間快4倍。
(3) 壓縮 reducer 輸出:
在此階段啓用壓縮技術可以減小要存儲的數據量,所以下降所需的磁盤空間。當 mapreduce 做業造成做業鏈條時,由於第二個做業的輸入也已壓縮,因此啓用壓縮一樣有效。
Yarn 是一個資源調度平臺,負責爲運算程序提供服務器運算資源,至關於一個分佈式的操做系統平臺,而 mapreduce 等運算程序則至關於運行於操做系統之上的應用程序。
(1) Yarn 並不清楚用戶提交的程序的運行機制;
(2) Yarn 只提供運算資源的調度 (用戶程序向 Yarn 申請資源,Yarn 就負責分配資源);
(3) Yarn 中的主管角色叫 ResourceManager;
(4) Yarn 中具體提供運算資源的角色叫NodeManager;
(5) 這樣一來,Yarn 其實就與運行的用戶程序徹底解耦,就意味着 Yarn 上能夠運行各類類型的分佈式運算程序 (mapreduce 只是其中的一種),好比 mapreduce、storm 程序,spark程序等;
(6) 因此,spark、storm 等運算框架均可以整合在 Yarn 上運行,只要他們各自的框架中有符合 Yarn 規範的資源請求機制便可;
(7) Yarn 就成爲一個通用的資源調度平臺,今後,企業中之前存在的各類運算集羣均可以整合在一個物理集羣上,提升資源利用率,方便數據共享。
(0) Mr 程序提交到客戶端所在的節點;
(1) YarnRunner 向 Resourcemanager 申請一個 application;
(2) RM 將該應用程序的資源路徑返回給 YarnRunner;
(3) 該程序將運行所需資源提交到 HDFS 上;
(4) 程序資源提交完畢後,申請運行 MRAppMaster;
(5) RM 將用戶的請求初始化成一個 Task;
(6) 其中一個 NodeManager 領取到 Task 任務;
(7) 該 NodeManager 建立容器 Container,併產生 MRAppmaster;
(8) Container 從 HDFS 上拷貝資源到本地;
(9) MRAppmaster 向 RM 申請運行 MapTask 容器;
(10) RM 將運行 MaptTask 任務分配給另外兩個 NodeManager,另兩個 NodeManager 分別領取任務並建立容器;
(11) MR 向兩個接收到任務的 NodeManager 發送程序啓動腳本,這兩個 NodeManager 分別啓動 MapTask,MapTask 對數據分區排序;
(12) MRAppmaster 向 RM 申請 2 個容器,運行 ReduceTask;
(13) ReduceTask 向 MapTask 獲取相應分區的數據;
(14) 程序運行完畢後,MR 會向 RM 註銷本身。