Hadoop初級之Hadoop(Mapreduce)

1.       一MapReduce入門

1.1 MapReduce定義

Mapreduce是一個分佈式運算程序 +的編程框架,是用戶開發「基於hadoop的數據分析應用」的核心框架;前端

Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在一個hadoop集羣上。算法

1.2 MapReduce優缺點

1.2.1 優勢

1)MapReduce 易於編程。它簡單的實現一些接口,就能夠完成一個分佈式程序,這個分佈式程序能夠分佈到大量廉價的 PC 機器運行。也就是說你寫一個分佈式程序,跟寫一個簡單的串行程序是如出一轍的。 就是由於這個特色使得 MapReduce 編程變得很是流行。sql

2)良好的擴展性。當你的計算資源不能獲得知足的時候,你能夠經過簡單的增長機器來擴展它的計算能力。編程

3)高容錯性。MapReduce 設計的初衷就是使程序可以部署在廉價的 PC 機器上,這就要求它具備很高的容錯性。好比其中一臺機器掛了,它能夠把上面的計算任務轉移到另一個節點上面上運行,不至於這個任務運行失敗,並且這個過程不須要人工參與,而徹底是由 Hadoop 內部完成的。緩存

4)適合 PB 級以上海量數據的離線處理。這裏加紅字體離線處理,說明它適合離線處理而不適合在線處理。好比像毫秒級別的返回一個結果,MapReduce 很難作到。服務器

離線分析:網絡

實時分析:數據結構

1.2.2 缺點

MapReduce不擅長作實時計算、流式計算、DAG(有向圖)計算。架構

1)實時計算。MapReduce 沒法像 Mysql 同樣,在毫秒或者秒級內返回結果。併發

 

回滾:假如執行某一個操做失敗,會返回到執行此操做以前的狀態

招商銀行: ATM 2000RMB  2000

 

 

2)流式計算。流式計算的輸入數據時動態的,而 MapReduce 的輸入數據集是靜態的,不能動態變化。這是由於 MapReduce 自身的設計特色決定了數據源必須是靜態的。spark

3)DAG(有向圖)計算。多個應用程序存在依賴關係,後一個應用程序的輸入爲前一個的輸出。在這種狀況下,MapReduce 並非不能作,而是使用後,每一個MapReduce 做業的輸出結果都會寫入到磁盤,會形成大量的磁盤IO,致使性能很是的低下。

1.3 MapReduce核心思想

 

1)分佈式的運算程序每每須要分紅至少2個階段。

2)第一個階段的maptask併發實例,徹底並行運行,互不相干。

3)第二個階段的reduce task併發實例互不相干,可是他們的數據依賴於上一個階段的全部maptask併發實例的輸出。

4)MapReduce編程模型只能包含一個map階段和一個reduce階段,若是用戶的業務邏輯很是複雜,那就只能多個mapreduce程序,串行運行。

1.4 MapReduce進程

一個完整的mapreduce程序在分佈式運行時有三類實例進程:

1)MrAppMaster:負責整個程序的過程調度及狀態協調

2)MapTask:負責map階段的整個數據處理流程。

3)ReduceTask:負責reduce階段的整個數據處理流程。

1.5 MapReduce編程規範

用戶編寫的程序分紅三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)

1)Mapper階段

(1)用戶自定義的Mapper要繼承本身的父類 Mapper

(2)Mapper的輸入數據是KV對的形式(KV的類型可自定義)

(3)Mapper中的業務邏輯寫在map()方法中

(4)Mapper的輸出數據是KV對的形式(KV的類型可自定義)

(5)map()方法(maptask進程)對每個<K,V>調用一次

2)Reducer階段

(1)用戶自定義的Reducer要繼承本身的父類

(2)Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV

(3)Reducer的業務邏輯寫在reduce()方法中

(4)Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法

3)Driver階段

整個程序須要一個Drvier來進行提交,提交的是一個描述了各類必要信息的job對象

1.6 MapReduce程序運行流程分析

 

1)在MapReduce程序讀取文件的輸入目錄上存放相應的文件。

 

2)客戶端程序在submit()方法執行前,獲取待處理的數據信息,而後根據集羣中參數的配置造成一個任務分配規劃。

3)客戶端提交job.split、jar包、job.xml等文件給yarn,yarn中的resourcemanager啓動MRAppMaster。

4)MRAppMaster啓動後根據本次job的描述信息,計算出須要的maptask實例數量,而後向集羣申請機器啓動相應數量的maptask進程。

5)maptask利用客戶指定的inputformat來讀取數據,造成輸入KV對。

6)maptask將輸入KV對傳遞給客戶定義的map()方法,作邏輯運算

7)map()運算完畢後將KV對收集到maptask緩存。

8)maptask緩存中的KV對按照K分區排序後不斷寫到磁盤文件

9)MRAppMaster監控到全部maptask進程任務完成以後,會根據客戶指定的參數啓動相應數量的reducetask進程,並告知reducetask進程要處理的數據分區。

10)Reducetask進程啓動以後,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,並在本地進行從新歸併排序,而後按照相同key的KV爲一個組,調用客戶定義的reduce()方法進行邏輯運算。

11)Reducetask運算完畢後,調用客戶指定的outputformat將結果數據輸出到外部存儲。

 

2.       二 Hadoop序列化

2.1 爲何要序列化?

    通常來講,「活的」對象只生存在內存裏,關機斷電就沒有了。並且「活的」對象只能由本地的進程使用,不能被髮送到網絡上的另一臺計算機。 然而序列化能夠存儲「活的」對象,能夠將「活的」對象發送到遠程計算機。

2.2 什麼是序列化?

序列化就是把內存中的對象,轉換成字節序列(或其餘數據傳輸協議)以便於存儲(持久化)和網絡傳輸。

反序列化就是將收到字節序列(或其餘數據傳輸協議)或者是硬盤的持久化數據,轉換成內存中的對象。

2.3 爲何不用Java的序列化?

Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化後,會附帶不少額外的信息(各類校驗信息,header,繼承體系等),不便於在網絡中高效傳輸。因此,hadoop本身開發了一套序列化機制(Writable),精簡、高效。

2.4 爲何序列化對Hadoop很重要?

 

由於Hadoop在集羣之間進行通信或者RPC調用的時候,須要序列化,並且要求序列化要快,且體積要小,佔用帶寬要小。因此必須理解Hadoop的序列化機制。

 序列化和反序列化在分佈式數據處理領域常常出現:進程通訊和永久存儲。然而Hadoop中各個節點的通訊是經過遠程調用(RPC)實現的,那麼 RPC序列化要求具備如下特色:

1)緊湊:緊湊的格式能讓咱們能充分利用網絡帶寬,而帶寬是數據中心最稀缺的資源

2)快速:進程通訊造成了分佈式系統的骨架,因此須要儘可能減小序列化和反序列化的性能開銷,這是基本的;

3)可擴展:協議爲了知足新的需求變化,因此控制客戶端和服務器過程當中,須要直接引進相應的協議,這些是新協議,原序列化方式能支持新的協議報文;

4)互操做:能支持不一樣語言寫的客戶端和服務端進行交互;

2.5 經常使用數據序列化類型

 

2.6 自定義bean對象實現序列化接口(Writable)

1)自定義bean對象要想序列化傳輸,必須實現序列化接口,須要注意如下7項。

 

// 1 必須實現Writable接口

public class FlowBean implements Writable {

 

private long upFlow;

private long downFlow;

private long sumFlow;

 

//2 反序列化時,須要反射調用空參構造函數,因此必須有

public FlowBean() {

       super();

}

 

/**

 * 3重寫序列化方法

 *

 * @param out

 * @throws IOException

 */

@Override

public void write(DataOutput out) throws IOException {

       out.writeLong(upFlow);

       out.writeLong(downFlow);

       out.writeLong(sumFlow);

}

 

/**

 * 4 重寫反序列化方法

5 注意反序列化的順序和序列化的順序徹底一致

 *

 * @param in

 * @throws IOException

 */

@Override

public void readFields(DataInput in) throws IOException {

       upFlow = in.readLong();

       downFlow = in.readLong();

       sumFlow = in.readLong();

}

 

    // 6要想把結果顯示在文件中,須要重寫toString(),且用」\t」分開,方便後續用

@Override

public String toString() {

       return upFlow + "\t" + downFlow + "\t" + sumFlow;

}

 

    //7 若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序

@Override

public int compareTo(FlowBean o) {

       // 倒序排列,從大到小

       return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

}

 

(1)必須實現Writable接口

(2)反序列化時,須要反射調用空參構造函數,因此必須有空參構造

(3)重寫序列化方法

(4)重寫反序列化方法

(5)注意反序列化的順序和序列化的順序徹底一致

(6)要想把結果顯示在文件中,須要重寫toString(),且用」\t」分開,方便後續用

(7)若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序

3.       三 MapReduce框架原理

3.1 MapReduce工做流程

1)流程示意圖

 

2)流程詳解

上面的流程是整個mapreduce最全工做流程,可是shuffle過程只是從第7步開始到第16步結束,具體shuffle過程詳解,以下:

1)maptask收集咱們的map()方法輸出的kv對,放到內存緩衝區中

2)從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件

3)多個溢出文件會被合併成大的溢出文件

4)在溢出過程當中,及合併的過程當中,都要調用partitioner進行分區和針對key進行排序

5)reducetask根據本身的分區號,去各個maptask機器上取相應的結果分區數據

6)reducetask會取到同一個分區的來自不一樣maptask的結果文件,reducetask會將這些文件再進行合併(歸併排序)

7)合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)

3)注意

Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快。

緩衝區的大小能夠經過參數調整,參數:io.sort.mb  默認100M

3.2 InputFormat數據輸入

 

文件是 MapReduce 任務數據的初始存儲地。正常狀況下,輸入文件通常是存儲在 HDFS 裏面。這些文件的格式能夠是任意的:咱們可使用基於行的日誌文件,也可使用二進制格式,多行輸入記錄或者其它一些格式。這些文件通常會很大,達到數十GB,甚至更大。那麼MapReduce是如何讀取這些數據的呢?下面咱們首先學習 InputFormat 接口。

3.2.1 自定義InputFormat

1)概述

(1)自定義一個類繼承FileInputFormat

(2)改寫RecordReader,實現一次讀取一個完整文件封裝爲KV

(3)在輸出時使用SequenceFileOutPutFormat輸出合併文件

3.2.2 FileInputFormat切片機制

FileInputFormat中默認的切片機制:

(1)簡單地按照文件的內容長度進行切片

(2)切片大小,默認等於block大小

(3)切片時不考慮數據集總體,而是逐個針對每個文件單獨切片

 

好比待處理數據有兩個文件:

file1.txt    320M

file2.txt    10M

 

通過FileInputFormat的切片機制運算後,造成的切片信息以下: 

file1.txt.split1--  0~128

file1.txt.split2--  128~256

file1.txt.split3--  256~320

file2.txt.split1--  0~10M

 

4)FileInputFormat切片大小的參數配置

 

(1)在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 

切片主要由這幾個值來運算決定

 

mapreduce.input.fileinputformat.split.minsize=1 默認值爲1

mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue

 

所以,默認狀況下,切片大小=blocksize。

 

maxsize(切片最大值):參數若是調得比blocksize小,則會讓切片變小,並且就等於配置的這個參數的值。

minsize(切片最小值):參數調的比blockSize大,則可讓切片變得比blocksize還大。

 

5)獲取切片信息API

 

// 根據文件類型獲取切片信息

FileSplit inputSplit = (FileSplit) context.getInputSplit();

 

// 獲取切片的文件名稱

String name = inputSplit.getPath().getName();

 

3.2.3 CombineTextInputFormat切片機制

關於大量小文件的優化策略

1)默認狀況下TextInputformat對任務的切片機制是按文件規劃切片,無論文件多小,都會是一個單獨的切片,都會交給一個maptask,這樣若是有大量小文件,就會產生大量的maptask,處理效率極其低下。

2)優化策略

(1)最好的辦法,在數據處理系統的最前端(預處理/採集),將小文件先合併成大文件,再上傳到HDFS作後續分析。

(2)補救措施:若是已是大量小文件在HDFS中了,可使用另外一種InputFormat來作切片(CombineFileInputFormat),它的切片邏輯跟TextFileInputFormat不一樣:它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個maptask。

(3)優先知足最小切片大小,不超過最大切片大小

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

3)具體實現步驟

// 9 若是不設置InputFormat,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class)

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

3.3 MapTask工做機制

3.3.1 並行度決定機制

1)問題引出

maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,mapTask並行任務是否越多越好呢?

2)MapTask並行度決定機制

一個job的map階段MapTask並行度(個數),由客戶端提交job時的切片個數決定。

3.3.2 MapTask工做機制

(1)Read階段:Map Task經過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。

(2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,併產生一系列新的key/value。

(3)Collect收集階段:在用戶編寫map()函數中,當數據處理完成後,通常會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩衝區中。

(4)Spill階段:即「溢寫」,當環形緩衝區滿後,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。須要注意的是,將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操做。

 

溢寫階段詳情:

 

    步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號partition進行排序,而後按照key進行排序。這樣,通過排序後,數據以分區爲單位彙集在一塊兒,且同一分區內全部數據按照key有序。

 

    步驟2:按照分區編號由小到大依次將每一個分區中的數據寫入任務工做目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。若是用戶設置了Combiner,則寫入文件以前,對每一個分區中的數據進行一次彙集操做。

 

    步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每一個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小。若是當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。

 

(5)Combine階段:當全部數據處理完成後,MapTask對全部臨時文件進行一次合併,以確保最終只會生成一個數據文件。
        當全部數據處理完後,MapTask會將全部臨時文件合併成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。
        在進行文件合併過程當中,MapTask以分區爲單位進行合併。對於某個分區,它將採用多輪遞歸合併的方式。每輪合併io.sort.factor(默認100)個文件,並將產生的文件從新加入待合併列表中,對文件排序後,重複以上過程,直到最終獲得一個大文件。
        讓每一個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。

3.4 Shuffle機制

3.4.1 Shuffle機制

Mapreduce確保每一個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出做爲輸入傳給reducer)稱爲shuffle。

3.4.2 Partition分區

0)問題引出:要求將統計結果按照條件輸出到不一樣文件中(分區)。好比:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(分區)

1)默認partition分區

 

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */

  public int getPartition(K key, V value, int numReduceTasks) {

    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

  }

}

 

默認分區是根據key的hashCode對reduceTasks個數取模獲得的。用戶無法控制哪一個key存儲到哪一個分區。

2)自定義Partitioner步驟

(1)自定義類繼承Partitioner,重寫getPartition()方法

 

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

@Override

public int getPartition(Text key, FlowBean value, int numPartitions) {

 

// 1 獲取電話號碼的前三位

       String preNum = key.toString().substring(0, 3);

      

       int partition = 4;

      

       // 2 判斷是哪一個省

       if ("136".equals(preNum)) {

              partition = 0;

       }else if ("137".equals(preNum)) {

              partition = 1;

       }else if ("138".equals(preNum)) {

              partition = 2;

       }else if ("139".equals(preNum)) {

              partition = 3;

       }

       return partition;

}

}

 

(2)在job驅動中,設置自定義partitioner:

 

job.setPartitionerClass(CustomPartitioner.class);

(3)自定義partition後,要根據自定義partitioner的邏輯設置相應數量的reduce task

 

job.setNumReduceTasks(5);

3)注意:

若是reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;

若是1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;

若是reduceTask的數量=1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;

例如:假設自定義分區數爲5,則

(1)job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件

(2)job.setNumReduceTasks(2);會報錯

(3)job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件

3.4.3 WritableComparable排序

排序是MapReduce框架中最重要的操做之一。Map Task和Reduce Task均會對數據(按照key)進行排序。該操做屬於Hadoop的默認行爲。任何應用程序中的數據均會被排序,而無論邏輯上是否須要。

對於Map Task,它會將處理的結果暫時放到一個緩衝區中,當緩衝區使用率達到必定閾值後,再對緩衝區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢後,它會對磁盤上全部文件進行一次合併,以將這些文件合併成一個大的有序文件。

對於Reduce Task,它從每一個Map Task上遠程拷貝相應的數據文件,若是文件大小超過必定閾值,則放到磁盤上,不然放到內存中。若是磁盤上文件數目達到必定閾值,則進行一次合併以生成一個更大文件;若是內存中文件大小或者數目超過必定閾值,則進行一次合併後將數據寫到磁盤上。當全部數據拷貝完畢後,Reduce Task統一對內存和磁盤上的全部數據進行一次合併。

1)排序的分類:

(1)部分排序:

MapReduce根據輸入記錄的鍵對數據集排序。保證輸出的每一個文件內部排序。

(2)全排序:

如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失了MapReduce所提供的並行架構。

替代方案:首先建立一系列排好序的文件;其次,串聯這些文件;最後,生成一個全局排序的文件。主要思路是使用一個分區來描述輸出的全局排序。例如:能夠爲上述文件建立3個分區,在第一分區中,記錄的單詞首字母a-g,第二分區記錄單詞首字母h-n, 第三分區記錄單詞首字母o-z。

(3)輔助排序:(GroupingComparator分組)

Mapreduce框架在記錄到達reducer以前按鍵對記錄排序,但鍵所對應的值並無被排序。甚至在不一樣的執行輪次中,這些值的排序也不固定,由於它們來自不一樣的map任務且這些map任務在不一樣輪次中完成時間各不相同。通常來講,大多數MapReduce程序會避免讓reduce函數依賴於值的排序。可是,有時也須要經過特定的方法對鍵進行排序和分組等以實現對值的排序。

(4)二次排序:

在自定義排序過程當中,若是compareTo中的判斷條件爲兩個即爲二次排序。

2)自定義排序WritableComparable

(1)原理分析
bean對象實現WritableComparable接口重寫compareTo方法,就能夠實現排序

 

@Override

public int compareTo(FlowBean o) {

    // 倒序排列,從大到小

    return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

3.4.4 GroupingComparator分組(輔助排序)

1)對reduce階段的數據根據某一個或幾個字段進行分組。

3.4.5 Combiner合併

1)combiner是MR程序中Mapper和Reducer以外的一種組件

2)combiner組件的父類就是Reducer

3)combiner和reducer的區別在於運行的位置:

Combiner是在每個maptask所在的節點運行

Reducer是接收全局全部Mapper的輸出結果;

4)combiner的意義就是對每個maptask的輸出進行局部彙總,以減少網絡傳輸量

5)combiner可以應用的前提是不能影響最終的業務邏輯,並且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來

 

Mapper

3 5 7 ->(3+5+7)/3=5

2 6 ->(2+6)/2=4

 

Reducer

(3+5+7+2+6)/5=23/5    不等於    (5+4)/2=9/2

 

6)自定義Combiner實現步驟:

(1)自定義一個combiner繼承Reducer,重寫reduce方法

 

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,

                  Context context) throws IOException, InterruptedException {

 

           int count = 0;

 

           for(IntWritable v :values){

                  count = v.get();

           }

           context.write(key, new IntWritable(count));

    }

}

 

(2)在job驅動類中設置: 

 

job.setCombinerClass(WordcountCombiner.class);

3.5 ReduceTask工做機制

1)設置ReduceTask

reducetask的並行度一樣影響整個job的執行併發度和執行效率,但與maptask的併發數由切片數決定不一樣,Reducetask數量的決定是能夠直接手動設置:

 

//默認值是1,手動設置爲4

job.setNumReduceTasks(4);

 

2)注意

(1)reducetask=0   ,表示沒有reduce階段,輸出文件個數和map個數一致。

(2)reducetask默認值就是1,因此輸出文件個數爲一個。

(3)若是數據分佈不均勻,就有可能在reduce階段產生數據傾斜

(4)reducetask數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有1個reducetask。

(5)具體多少個reducetask,須要根據集羣性能而定。

(6)若是分區數不是1,可是reducetask爲1,是否執行分區過程。答案是:不執行分區過程。由於在maptask的源碼中,執行分區的前提是先判斷reduceNum個數是否大於1。不大於1確定不執行。

3)實驗:測試reducetask多少合適。

4)ReduceTask工做機制

(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,若是其大小超過必定閾值,則寫到磁盤上,不然直接放到內存中。

(2)Merge階段:在遠程拷貝數據的同時,ReduceTask啓動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多。

(3)Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行彙集的一組數據。爲了將key相同的數據聚在一塊兒,Hadoop採用了基於排序的策略。因爲各個MapTask已經實現對本身的處理結果進行了局部排序,所以,ReduceTask只需對全部數據進行一次歸併排序便可。

(4)Reduce階段:reduce()函數將計算結果寫到HDFS上。

3.6 OutputFormat數據輸出

3.6.1 OutputFormat接口實現類

 OutputFormat是MapReduce輸出的基類,全部實現MapReduce輸出都實現了 OutputFormat接口。下面咱們介紹幾種常見的OutputFormat實現類。

1)文本輸出TextOutputFormat

默認的輸出格式是TextOutputFormat,它把每條記錄寫爲文本行。它的鍵和值能夠是任意類型,由於TextOutputFormat調用toString()方法把它們轉換爲字符串。

2)SequenceFileOutputFormat

 SequenceFileOutputFormat將它的輸出寫爲一個順序文件。若是輸出須要做爲後續 MapReduce任務的輸入,這即是一種好的輸出格式,由於它的格式緊湊,很容易被壓縮。

3)自定義OutputFormat

根據用戶需求,自定義實現輸出。

3.6.2 自定義OutputFormat

爲了實現控制最終文件的輸出路徑,能夠自定義OutputFormat。

要在一個mapreduce程序中根據數據的不一樣輸出兩類結果到不一樣目錄,這類靈活的輸出需求能夠經過自定義outputformat來實現。

1)自定義OutputFormat步驟

(1)自定義一個類繼承FileOutputFormat。

(2)改寫recordwriter,具體改寫輸出數據的方法write()。

3.7 計數器應用

Hadoop爲每一個做業維護若干內置計數器,以描述多項指標。例如,某些計數器記錄已處理的字節數和記錄數,使用戶可監控已處理的輸入數據量和已產生的輸出數據量。

1)API

(1)採用枚舉的方式統計計數

 

enum MyCounter{MALFORORMED,NORMAL}

 

//對枚舉定義的自定義計數器加1

context.getCounter(MyCounter.MALFORORMED).increment(1);

 

(2)採用計數器組、計數器名稱的方式統計

 

context.getCounter("counterGroup", "countera").increment(1);

組名和計數器名稱隨便起,但最好有意義。

 

(3)計數結果在程序運行後的控制檯上查看。

3.8 Join多種應用

3.8.1 Reduce join

1)原理:

Map端的主要工做:爲來自不一樣表(文件)的key/value對打標籤以區別不一樣來源的記錄。而後用鏈接字段做爲key,其他部分和新加的標誌做爲value,最後進行輸出。

reduce端的主要工做:在reduce端以鏈接字段做爲key的分組已經完成,咱們只須要在每個分組當中將那些來源於不一樣文件的記錄(在map階段已經打標誌)分開,最後進行合併就ok了

2)該方法的缺點

這裏主要分析一下reduce join的一些不足。之因此會存在reduce join這種方式,是由於總體數據被分割了,每一個map task只處理一部分數據而不可以獲取到全部須要的join字段,所以咱們能夠充分利用mapreduce框架的特性,讓他按照join key進行分區,將全部join key相同的記錄集中起來進行處理,因此reduce join這種方式就出現了。

這種方式的缺點很明顯就是會形成map和reduce端也就是shuffle階段出現大量的數據傳輸,效率很低。

3.8.2 Map join

1)使用場景:一張表十分小、一張表很大。

2)使用方法:

在提交做業的時候先將小表文件放到該做業的DistributedCache中,而後從DistributeCache中取出該小表進行join (好比放到Hash Map等等容器中)。而後掃描大表,看大表中的每條記錄的join key/value值是否可以在內存中找到相同join key的記錄,若是有則直接輸出結果。

3.8.3 Distributedcache分佈式緩存

1)數據傾斜緣由

若是是多張表的操做都是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜。

2)解決方案

在map端緩存多張表,提早處理業務邏輯,這樣增長map端業務,減小reduce端數據的壓力,儘量的減小數據傾斜。

3)具體辦法:採用distributedcache

(1)在mapper的setup階段,將文件讀取到緩存集合中

(2)在驅動函數中加載緩存。

job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運行節點

3.9 數據清洗

1)概述

在運行核心業務Mapreduce程序以前,每每要先對數據進行清洗,清理掉不符合用戶要求的數據。清理的過程每每只須要運行mapper程序,不須要運行reduce程序。

3.10 MapReduce開發總結

 

在編寫mapreduce程序時,須要考慮的幾個方面:

1)輸入數據接口:InputFormat

   默認使用的實現類是:TextInputFormat

   TextInputFormat的功能邏輯是:一次讀一行文本,而後將該行的起始偏移量做爲key,行內容做爲value返回

CombineTextInputFormat能夠把多個小文件合併成一個切片處理,提升處理效率。

用戶還能夠自定義InputFormat。

 

2)邏輯處理接口:Mapper 

   用戶根據業務需求實現其中三個方法:map()   setup()   cleanup ()

 

3)Partitioner分區

    有默認實現 HashPartitioner,邏輯是根據key的哈希值和numReduces來返回一個分區號;key.hashCode()&Integer.MAXVALUE % numReduces

    若是業務上有特別的需求,能夠自定義分區。

 

4)Comparable排序

    當咱們用自定義的對象做爲key來輸出時,就必需要實現WritableComparable接口,重寫其中的compareTo()方法。

    部分排序:對最終輸出的沒一個文件進行內部排序。

    全排序:對全部數據進行排序,一般只有一個Reduce。

    二次排序:排序的條件有兩個。

 

5)Combiner合併

Combiner合併能夠提升程序執行效率,減小io傳輸。可是使用時必須不能影響原有的業務處理結果。

 

6)reduce端分組:Groupingcomparator

    reduceTask拿到輸入數據(一個partition的全部數據)後,首先須要對數據進行分組,其分組的默認原則是key相同,而後對每一組kv數據調用一次reduce()方法,而且將這一組kv中的第一個kv的key做爲參數傳給reduce的key,將這一組數據的value的迭代器傳給reduce()的values參數。

    利用上述這個機制,咱們能夠實現一個高效的分組取最大值的邏輯。

    自定義一個bean對象用來封裝咱們的數據,而後改寫其compareTo方法產生倒序排序的效果。而後自定義一個Groupingcomparator,將bean對象的分組邏輯改爲按照咱們的業務分組id來分組(好比訂單號)。這樣,咱們要取的最大值就是reduce()方法中傳進來key。

 

7)邏輯處理接口:Reducer

    用戶根據業務需求實現其中三個方法:  reduce()   setup()   cleanup ()

 

8)輸出數據接口:OutputFormat

    默認實現類是TextOutputFormat,功能邏輯是:將每個KV對向目標文本文件中輸出爲一行。

用戶還能夠自定義OutputFormat。

相關文章
相關標籤/搜索