Hadoop之MapReduce理論篇02

1. ReduceTask工做機制

1.1. 設置ReduceTask 

ReduceTask 的並行度一樣影響整個 job 的執行併發度和執行效率,但與 MapTask 的併發數由切片數決定不一樣,ReduceTask 數量的決定是能夠直接手動設置:java

// 默認值是1,手動設置爲4
job.setNumReduceTasks(4);

1.2. 注意

(1) 若是數據分佈不均勻,就有可能在 Reduce 階段產生數據傾斜;
(2) ReduceTask 數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有 1 個 ReduceTask;
(3) 具體多少個 ReduceTask,須要根據集羣性能而定;
(4) 若是分區數不是 1,可是 ReduceTask 爲 1,是否執行分區過程。答案是:不執行分區過程。由於在 MapTask 的源碼中,執行分區的前提是先判斷 ReduceNum 個數是否大於 1。不大於 1 確定不執行。算法

1.3. 實驗:測試ReduceTask多少合適

(1) 實驗環境:1 個 master 節點,16 個 slave 節點: CPU:8GHZ , 內存: 2G,MapTask 數量爲 16,測試數據量爲 1G;
(2) 實驗結論:apache

Reduce task服務器

1網絡

5併發

10app

15框架

16分佈式

20ide

25

30

45

60

總時間

892

146

110

92

88

100

128

101

145

104

1.4. ReduceTask工做機制

(1) Copy 階段:ReduceTask 從各個 MapTask 上遠程拷貝一片數據,並針對某一片數據,若是其大小超過必定閾值,則寫到磁盤上,不然直接放到內存中;
(2) Merge 階段:在遠程拷貝數據的同時,ReduceTask 啓動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多;
(3) Sort 階段:按照 MapReduce 語義,用戶編寫 reduce() 函數輸入數據是按 key 進行彙集的一組數據。爲了將 key 相同的數據聚在一塊兒,Hadoop 採用了基於排序的策略。因爲各個 MapTask 已經實現對本身的處理結果進行了局部排序,所以,ReduceTask 只需對全部數據進行一次歸併排序便可;
(4) Reduce 階段:reduce() 函數將計算結果寫到 HDFS 上。

2. 自定義OutputFormat

2.1. 基本概念

要在一個 mapreduce 程序中根據數據的不一樣輸出兩類結果到不一樣目錄,這類靈活的輸出需求能夠經過自定義 outputformat 來實現:
(1) 自定義 outputformat;
(2) 改寫 recordwriter,具體改寫輸出數據的方法 write();

2.2. 案例實操

2.2.1. 需求

使用自定義 OutputFormat 實現過濾日誌及自定義日誌輸出路徑:
過濾輸入的 log 日誌中是否包含 bigdata
    (1)包含 bigdata 的日誌輸出到 e:/bigdata.log
    (2)不包含 bigdata 的日誌輸出到 e:/other.log

2.2.2. 代碼實現

(1) 自定義一個 outputformat

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

	@Override
	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {

		// 建立一個RecordWriter
		return new FilterRecordWriter(job);
	}
}

(2) 具體的寫數據 RecordWriter

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
	FSDataOutputStream bigdataOut = null;
	FSDataOutputStream otherOut = null;

	public FilterRecordWriter(TaskAttemptContext job) {
		// 1 獲取文件系統
		FileSystem fs;

		try {
			fs = FileSystem.get(job.getConfiguration());

			// 2 建立輸出文件路徑
			Path bigdataPath = new Path("e:/bigdata.log");
			Path otherPath = new Path("e:/other.log");

			// 3 建立輸出流
			bigdataOut = fs.create(bigdataPath);
			otherOut = fs.create(otherPath);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void write(Text key, NullWritable value) throws IOException, InterruptedException {

		// 判斷是否包含「bigdata」輸出到不一樣文件
		if (key.toString().contains("bigdata")) {
			bigdataOut.write(key.toString().getBytes());
		} else {
			otherOut.write(key.toString().getBytes());
		}
	}

	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		// 關閉資源
		if (bigdataOut!= null) {
			bigdataOut.close();
		}
		
		if (otherOut != null) {
			otherOut.close();
		}
	}
}

(3) 編寫 FilterMapper

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// 1 獲取一行
		String line = value.toString();
		
		k.set(line);
		
		// 3 寫出
		context.write(k, NullWritable.get());
	}
}

(4) 編寫 FilterReducer

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {

		String k = key.toString();
		k = k + "\r\n";

		context.write(new Text(k), NullWritable.get());
	}
}

(5) 編寫 FilterDriver

package com.test.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);

		job.setJarByClass(FilterDriver.class);
		job.setMapperClass(FilterMapper.class);
		job.setReducerClass(FilterReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 要將自定義的輸出格式組件設置到job中
		job.setOutputFormatClass(FilterOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// 雖然咱們自定義了outputformat,可是由於咱們的outputformat繼承自fileoutputformat
		// 而fileoutputformat要輸出一個_SUCCESS文件,因此,在這還得指定一個輸出目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

3. MapReduce數據壓縮

3.1. 基本概念 

      壓縮技術可以有效減小底層存儲系統 (HDFS) 讀寫字節數。壓縮提升了網絡帶寬和磁盤空間的效率。在 Hadood 下,尤爲是數據規模很大和工做負載密集的狀況下,使用數據壓縮顯得很是重要。在這種狀況下, I/O 操做和網絡數據傳輸要花大量的時間。還有,Shuffle 與 Merge 過程一樣也面臨着巨大的 I/O 壓力。
      鑑於磁盤 I/O 和網絡帶寬是 Hadoop 的寶貴資源,數據壓縮對於節省資源、最小化磁盤 I/O 和網絡傳輸很是有幫助。不過,儘管壓縮與解壓操做的 CPU 開銷不高,其性能的提高和資源的節省並不是沒有代價。
      若是磁盤 I/O 和網絡帶寬影響了 MapReduce 做業性能,在任意 MapReduce 階段啓用壓縮均可以改善端到端處理時間並減小 I/O 和網絡流量。
      壓縮 MapReduce 的一種優化策略:經過壓縮編碼對 Mapper 或者 Reducer 的輸出進行壓縮,以減小磁盤 IO,提升MR程序運行速度(但相應增長了 CPU 運算負擔)
      注意:壓縮特性運用得當能提升性能,但運用不當也可能下降性能
      基本原則:
(1) 運算密集型的 job,少用壓縮
(2) IO 密集型的 job,多用壓縮

3.2. MR 支持的壓縮編碼

壓縮格式

工具

算法

文件擴展名

是否可切分

DEFAULT

DEFAULT

.deflate

Gzip

gzip

DEFAULT

.gz

bzip2

bzip2

bzip2

.bz2

LZO

lzop

LZO

.lzo

LZ4

LZ4

.lz4

Snappy

Snappy

.snappy

爲了支持多種壓縮/解壓縮算法,Hadoop 引入了編碼/解碼類,以下表所示:

壓縮格式

對應的編碼/解碼類

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

LZO

com.hadoop.compression.lzo.LzopCodec

LZ4

org.apache.hadoop.io.compress.Lz4Codec

Snappy

org.apache.hadoop.io.compress.SnappyCodec

壓縮性能的比較:

壓縮算法

原始文件大小

壓縮文件大小

壓縮速度

解壓速度

gzip

8.3GB

1.8GB

17.5MB/s

58MB/s

bzip2

8.3GB

1.1GB

2.4MB/s

9.5MB/s

LZO-bset

8.3GB

2GB

4MB/s

60.6MB/s

LZO

8.3GB

2.9GB

49.3MB/s

74.6MB/s

3.3. 採用壓縮的位置

(1) 輸入壓縮:
      在有大量數據並計劃重複處理的狀況下,應該考慮對輸入進行壓縮。然而,你無須顯示指定使用的編解碼方式。Hadoop 自動檢查文件擴展名,若是擴展名可以匹配,就會用恰當的編解碼方式對文件進行壓縮和解壓。不然,Hadoop 就不會使用任何編解碼。
(2) 壓縮 mapper 輸出:
當 map 任務輸出的中間數據量很大時,應考慮在此階段採用壓縮技術。這能顯著改善內部數據 Shuffle 過程,而 Shuffle 過程在 Hadoop 處理過程當中是資源消耗最多的環節。若是發現數據量大形成網絡傳輸緩慢,應該考慮使用壓縮技術。可用於壓縮 mapper 輸出的快速編解碼包括 LZO、LZ4 或者 Snappy。
      注:LZO 是供 Hadoop 壓縮數據用的通用壓縮編解碼。其設計目標是達到與硬盤讀取速度至關的壓縮速度,所以速度是優先考慮的因素,而不是壓縮率。與 gzip 編解碼相比,它的壓縮速度是 gzip 的 5 倍,而解壓速度是 gzip 的 2 倍。同一個文件用 LZO 壓縮後比用 gzip 壓縮後大 50%,但比壓縮前小 25%~50%。這對改善性能很是有利,map 階段完成時間快4倍。
(3) 壓縮 reducer 輸出:
      在此階段啓用壓縮技術可以減小要存儲的數據量,所以下降所需的磁盤空間。當 mapreduce 做業造成做業鏈條時,由於第二個做業的輸入也已壓縮,因此啓用壓縮一樣有效。

4. Yarn資源管理

4.1. 基本概念

Yarn 是一個資源調度平臺,負責爲運算程序提供服務器運算資源,至關於一個分佈式的操做系統平臺,而 mapreduce 等運算程序則至關於運行於操做系統之上的應用程序。

4.2. Yarn 的重要概念

(1) Yarn 並不清楚用戶提交的程序的運行機制;
(2) Yarn 只提供運算資源的調度 (用戶程序向 Yarn 申請資源,Yarn 就負責分配資源);
(3) Yarn 中的主管角色叫 ResourceManager;
(4) Yarn 中具體提供運算資源的角色叫NodeManager;
(5) 這樣一來,Yarn 其實就與運行的用戶程序徹底解耦,就意味着 Yarn 上能夠運行各類類型的分佈式運算程序 (mapreduce 只是其中的一種),好比 mapreduce、storm 程序,spark程序等;
(6) 因此,spark、storm 等運算框架均可以整合在 Yarn 上運行,只要他們各自的框架中有符合 Yarn 規範的資源請求機制便可;
(7) Yarn 就成爲一個通用的資源調度平臺,今後,企業中之前存在的各類運算集羣均可以整合在一個物理集羣上,提升資源利用率,方便數據共享。

4.3. Yarn 的工做機制

(0) Mr 程序提交到客戶端所在的節點;
(1) YarnRunner 向 Resourcemanager 申請一個 application;
(2) RM 將該應用程序的資源路徑返回給 YarnRunner;
(3) 該程序將運行所需資源提交到 HDFS 上;
(4) 程序資源提交完畢後,申請運行 MRAppMaster;
(5) RM 將用戶的請求初始化成一個 Task;
(6) 其中一個 NodeManager 領取到 Task 任務;
(7) 該 NodeManager 建立容器 Container,併產生 MRAppmaster;
(8) Container 從 HDFS 上拷貝資源到本地;
(9) MRAppmaster 向 RM 申請運行 MapTask 容器;
(10) RM 將運行 MaptTask 任務分配給另外兩個 NodeManager,另兩個 NodeManager 分別領取任務並建立容器;
(11) MR 向兩個接收到任務的 NodeManager 發送程序啓動腳本,這兩個 NodeManager 分別啓動 MapTask,MapTask 對數據分區排序;
(12) MRAppmaster 向 RM 申請 2 個容器,運行 ReduceTask;
(13) ReduceTask 向 MapTask 獲取相應分區的數據;
(14) 程序運行完畢後,MR 會向 RM 註銷本身。
 

本文爲原創文章,若是對你有一點點的幫助,別忘了點贊哦!比心!如需轉載,請註明出處,謝謝!

相關文章
相關標籤/搜索