MapReduce分佈式編程模型

大數據技術之HadoopMapReducehtml

MapReduce入門

1.1 MapReduce定義

Mapreduce是一個分佈式運算程序的編程框架,是用戶開發「基於hadoop的數據分析應用」的核心框架。前端

Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在一個hadoop集羣上。java

1.2 MapReduce優缺點

1.2.1 優勢

1MapReduce 易於編程它簡單的實現一些接口,就能夠完成一個分佈式程序,這個分佈式程序能夠分佈到大量廉價的PC機器上運行。也就是說你寫一個分佈式程序,跟寫一個簡單的串行程序是如出一轍的。就是由於這個特色使得MapReduce編程變得很是流行。node

2)良好的擴展性當你的計算資源不能獲得知足的時候,你能夠經過簡單的增長機器來擴展它的計算能力。linux

3高容錯性MapReduce設計的初衷就是使程序可以部署在廉價的PC機器上,這就要求它具備很高的容錯性。好比其中一臺機器掛了,它能夠把上面的計算任務轉移到另一個節點上運行,不至於這個任務運行失敗,並且這個過程不須要人工參與,而徹底是由 Hadoop內部完成的。git

4)適合PB級以上海量數據的離線處理這裏加紅字體離線處理,說明它適合離線處理而不適合在線處理。好比像毫秒級別的返回一個結果,MapReduce很難作到。github

1.2.2 缺點

MapReduce擅長作實時計算、流式計算、DAG有向圖計算。web

1)實時計算。MapReduce沒法像Mysql同樣,在毫秒或者秒級內返回結果。算法

2)流式計算。流式計算的輸入數據是動態的,而MapReduce的輸入數據集是靜態的,不能動態變化。這是由於MapReduce自身的設計特色決定了數據源必須是靜態的。spring

3DAG(有向圖)計算。多個應用程序存在依賴關係,後一個應用程序的輸入爲前一個的輸出。在這種狀況下,MapReduce並非不能作,而是使用後,每一個MapReduce做業的輸出結果都會寫入到磁盤,會形成大量的磁盤IO,致使性能很是的低下。

1.3 MapReduce核心思想

 

 

1)分佈式的運算程序每每須要分紅至少2個階段。

2)第一個階段的maptask併發實例,徹底並行運行,互不相干。

3)第二個階段的reduce task併發實例互不相干,可是他們的數據依賴於上一個階段的全部maptask併發實例的輸出。

4)MapReduce編程模型只能包含一個map階段和一個reduce階段,若是用戶的業務邏輯很是複雜,那就只能多個mapreduce程序,串行運行。

1.4 MapReduce進程

一個完整的mapreduce程序在分佈式運行時有三類實例進程:

1MrAppMaster:負責整個程序的過程調度及狀態協調。

2MapTask:負責map階段的整個數據處理流程。

3ReduceTask:負責reduce階段的整個數據處理流程。

1.5 MapReduce編程規範

用戶編寫的程序分紅三個部分:MapperReducerDriver(提交運行mr程序的客戶端)

1Mapper階段

1)用戶自定義的Mapper要繼承本身的父類

(2Mapper的輸入數據是KV對的形式(KV的類型可自定義)

(3Mapper中的業務邏輯寫在map()方法中

(4Mapper的輸出數據是KV對的形式(KV的類型可自定義)

(5map()方法(maptask進程)對每個<K,V>調用一次

2Reducer階段

1)用戶自定義的Reducer要繼承本身的父類

(2Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV

(3Reducer的業務邏輯寫在reduce()方法中

(4Reducetask進程對每一組相同k<k,v>組調用一次reduce()方法

3Driver階段

整個程序須要一個Drvier來進行提交,提交的是一個描述了各類必要信息的job對象

Hadoop序列化

2.1 爲何要序列化?

        通常來講,「活的」對象只生存在內存裏,關機斷電就沒有了。並且「活的」對象只能由本地的進程使用,不能被髮送到網絡上的另一臺計算機。 然而序列化能夠存儲「活的」對象,能夠將「活的」對象發送到遠程計算機。

2.2 什麼是序列化?

序列化就是把內存中的對象,轉換成字節序列(或其餘數據傳輸協議)以便於存儲(持久化)和網絡傳輸。 

反序列化就是將收到字節序列(或其餘數據傳輸協議)或者是硬盤的持久化數據,轉換成內存中的對象。

2.3 爲何不用Java的序列化?

        Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化後,會附帶不少額外的信息(各類校驗信息,header,繼承體系等),不便於在網絡中高效傳輸。因此,hadoop本身開發了一套序列化機制(Writable),精簡、高效。

2.4 爲何序列化對Hadoop很重要?

         由於Hadoop在集羣之間進行通信或者RPC調用的時候,須要序列化,並且要求序列化要快,且體積要小,佔用帶寬要小。因此必須理解Hadoop的序列化機制。

        序列化和反序列化在分佈式數據處理領域常常出現:進程通訊和永久存儲。然而Hadoop中各個節點的通訊是經過遠程調用(RPC)實現的,那麼RPC序列化要求具備如下特色:

1)緊湊:緊湊的格式能讓咱們充分利用網絡帶寬,而帶寬是數據中心最稀缺的資

2)快速:進程通訊造成了分佈式系統的骨架,因此須要儘可能減小序列化和反序列化的性能開銷,這是基本的;

3)可擴展:協議爲了知足新的需求變化,因此控制客戶端和服務器過程當中,須要直接引進相應的協議,這些是新協議,原序列化方式能支持新的協議報文;

4)互操做:能支持不一樣語言寫的客戶端和服務端進行交互; 

2.5 經常使用數據序列化類型

經常使用的數據類型對應的hadoop數據序列化類型

Java類型

Hadoop Writable類型

boolean

BooleanWritable

byte

ByteWritable

int

IntWritable

float

FloatWritable

long

LongWritable

double

DoubleWritable

string

Text

map

MapWritable

array

ArrayWritable

2.6 自定義bean對象實現序列化接口Writable

1自定義bean對象要想序列化傳輸,必須實現序列化接口須要注意如下7

1)必須實現Writable接口

2反序列化時,須要反射調用空參構造函數,因此必須有空參構造

public FlowBean() {

super();

}

3)重寫序列化方法

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);

out.writeLong(downFlow);

out.writeLong(sumFlow);

}

4)重寫反序列化方法

@Override

public void readFields(DataInput in) throws IOException {

upFlow = in.readLong();

downFlow = in.readLong();

sumFlow = in.readLong();

}

5注意反序列化的順序和序列化的順序徹底一致

6)要想把結果顯示在文件中,須要重寫toString(),可」\t」分開,方便後續用

7)若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序。

@Override

public int compareTo(FlowBean o) {

// 倒序排列,從大到小

return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

MapReduce框架原理

3.1 MapReduce工做流程

1)流程示意圖

 

 

 

2)流程詳解

上面的流程是整個mapreduce最全工做流程,可是shuffle過程只是從7步開始16結束,具體shuffle過程詳解以下:

1maptask收集咱們的map()方法輸出的kv對,放到內存緩衝區中

2)從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件

3)多個溢出文件會被合併成大的溢出文件

4)在溢出過程當中,及合併的過程當中,都要調用partitioner進行分區和針對key進行排序

5)reducetask根據本身的分區號,去各個maptask機器上取相應的結果分區數據

6reducetask會取到同一個分區的來自不一樣maptask的結果文件,reducetask會將這些文件再進行合併(歸併排序)

7)合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)

3注意

Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快。

緩衝區的大小能夠經過參數調整,參數:io.sort.mb  默認100M

3.2 InputFormat數據輸入

3.2.1 Job提交流程和切片源碼詳解

1job提交流程源碼詳解

 

waitForCompletion()

submit();

// 1創建鏈接

connect();

// 1建立提交job代理

new Cluster(getConfiguration());

// (1)判斷是本地yarn仍是遠程

initialize(jobTrackAddr, conf);

// 2 提交job

submitter.submitJobInternal(Job.this, cluster)

// 1)建立給集羣提交數據的Stag路徑

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

// 2)獲取jobid ,並建立job路徑

JobID jobId = submitClient.getNewJobID();

// 3)拷貝jar包到集羣

copyAndConfigureFiles(job, submitJobDir);

rUploader.uploadFiles(job, jobSubmitDir);

// 4)計算切片,生成切片規劃文件

writeSplits(job, submitJobDir);

maps = writeNewSplits(job, jobSubmitDir);

input.getSplits(job);

// 5Stag路徑xml配置文件

writeConf(conf, submitJobFile);

conf.writeXml(out);

// 6提交job,返回提交狀態

status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

2)FileInputFormat源碼解析(input.getSplits(job))

1)找到你數據存儲的目錄

2)開始遍歷處理(規劃切片)目錄下的每個文件

3)遍歷第一個文件ss.txt

a)獲取文件大小fs.sizeOf(ss.txt);

b)計算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

c默認狀況下,切片大小=blocksize

d)開始切,造成第1個切片:ss.txt—0:128M 2個切片ss.txt—128:256M 3個切片ss.txt—256M:300M(每次切片時,都要判斷切完剩下的部分是否大於塊的1.1倍不大於1.1就劃分一塊切片)

e)將切片信息寫到一個切片規劃文件中

f)整個切片核心過程在getSplit()方法中完成。

g)數據切片只是在邏輯上對輸入數據進行分片,並不會再磁盤上將其切分紅分片進行存儲InputSplit只記錄了分片的元數據信息,好比起始位置長度以及所在的節點列表等。

h注意:blockHDFS物理上存儲的數據,切片是對數據邏輯上的劃分。

4)提交切片規劃文件到yarn,yarn的MrAppMaster就能夠根據切片規劃文件計算開啓maptask個數。

3.2.2 FileInputFormat切片機制

1FileInputFormat中默認的切片機制:

1)簡單地按照文件的內容長度進行切片

2)切片大小,默認等於block大小

3)切片時不考慮數據集總體,而是逐個針對每個文件單獨切片

好比待處理數據有兩個文件:

file1.txt    320M

file2.txt    10M

通過FileInputFormat的切片機制運算後,造成的切片信息以下:  

file1.txt.split1--  0~128

file1.txt.split2--  128~256

file1.txt.split3--  256~320

file2.txt.split1--  0~10M

2)FileInputFormat切片大小的參數配置

經過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 

切片主要由這幾個值來運算決定

mapreduce.input.fileinputformat.split.minsize=1 默認值爲1

mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認Long.MAXValue

所以,默認狀況下,切片大小=blocksize

maxsize(切片最大值):參數若是調得比blocksize小,則會讓切片變小,並且就等於配置的這個參數的值。

minsize(切片最小值):參數調的比blockSize大,則可讓切片變得比blocksize還大。

3)獲取切片信息API

// 根據文件類型獲取切片信息

FileSplit inputSplit = (FileSplit) context.getInputSplit();

// 獲取切片的文件名稱

String name = inputSplit.getPath().getName();

3.2.3 CombineTextInputFormat切片機制

關於大量小文件的優化策略

1)默認狀況下TextInputformat對任務的切片機制是按文件規劃切片無論文件多小都會是一個單獨的切都會交給一個maptask,這樣若是有大量小文件產生大量的maptask處理效率極其低下。

2)優化策略

1)最好的辦法,在數據處理系統的最前端(預處理/採集將小文件先合併成大文件,再上傳到HDFS作後續分析。

2)補救措施若是已是大量小文件在HDFS中了,能夠使用另外一種InputFormat來作切片(CombineTextInputFormat,它的切片邏輯跟TextFileInputFormat不一樣:它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件能夠交給一個maptask。

3優先知足最小切片大小,不超過最大切片大小

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

舉例0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

3)具體實現步驟

//  若是不設置InputFormat,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class)

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

4)案例實操

3.2.4 InputFormat接口實現

MapReduce任務的輸入文件通常是存儲在HDFS裏面。輸入的文件格式包括:基於行的日誌文件、二進制格式文件等。這些文件通常會很大,達到數十GB,甚至更大。那麼MapReduce是如何讀取這些數據的呢?下面咱們首先學習InputFormat接口。

InputFormat常見接口實現類包括:TextInputFormatKeyValueTextInputFormatNLineInputFormatCombineTextInputFormat和自定義InputFormat等

1TextInputFormat

TextInputFormat是默認的InputFormat。每條記錄是一行輸入。鍵是LongWritable類型,存儲該行在整個文件中的字節偏移量。值是這行的內容,不包括任何行終止符(換行符和回車符)。

如下是一個示例,好比,一個分片包含了以下4條文本記錄。

Rich learning form

Intelligent learning engine

Learning more convenient

From the real demand for more close to the enterprise

每條記錄表示爲如下鍵/值對:

(0,Rich learning form)

(19,Intelligent learning engine)

(47,Learning more convenient)

(72,From the real demand for more close to the enterprise)

很明顯,鍵並非行號。通常狀況下,很難取得行號,由於文件按字節而不是按行切分爲分片。

2KeyValueTextInputFormat

每一行均爲一條記錄,被分隔符分割爲keyvalue。能夠經過驅動類中設置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");來設定分隔符。默認分隔符是tab\t)。

如下是一個示例,輸入是一個包含4條記錄的分片。其中——>表示一個(水平方向的)製表符。

line1 ——>Rich learning form

line2 ——>Intelligent learning engine

line3 ——>Learning more convenient

line4 ——>From the real demand for more close to the enterprise

每條記錄表示爲如下鍵/值對:

(line1,Rich learning form)

(line2,Intelligent learning engine)

(line3,Learning more convenient)

(line4,From the real demand for more close to the enterprise)

 此時的鍵是每行排在製表符以前的Text序列。

 3NLineInputFormat

若是使用NlineInputFormat,表明每一個map進程處理的InputSplit再也不按block塊去劃分,而是NlineInputFormat指定的行數N來劃分。即輸入文件的總行數/N=切片(20),若是不整除,切片=+1

如下是一個示例,仍然以上面的4行輸入爲例。

Rich learning form

Intelligent learning engine

Learning more convenient

From the real demand for more close to the enterprise

 例如,若是N2,則每一個輸入分片包含兩行。開啓2maptask。

(0,Rich learning form)

(19,Intelligent learning engine)

另外一個 mapper 則收到後兩行:

(47,Learning more convenient)

(72,From the real demand for more close to the enterprise)

        這裏的鍵和值與TextInputFormat生成的同樣。

3.2.5 自定義InputFormat

1概述

1)自定義一個類繼承FileInputFormat

2)改寫RecordReader,實現一次讀取一個完整文件封裝爲KV

3)在輸出時使用SequenceFileOutPutFormat輸出合併文件。

2)案例實操

詳見7.4小文件處理(自定義InputFormat

3.3 MapTask工做機制

3.3.1 並行度決定機制

1問題引出

maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼mapTask並行任務是否越多越好呢?

2MapTask並行度決定機制

一個jobmap階段MapTask並行度個數由客戶端提交job時的切片個數決定

 

3.3.2 MapTask工做機制

 

1Read階段:Map Task經過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。

2Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,併產生一系列新的key/value。

3Collect收集階段:在用戶編寫map()函數中,當數據處理完成後,通常會調用OutputCollector.collect()輸出結果。在函數內部,它會生成的key/value分區調用Partitioner並寫入一個環形內存緩衝區中。

4Spill階段:即「溢」,當環形緩衝區滿後,MapReduce將數據寫到本地磁盤上,生成一個臨時文件。須要注意的是,將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要對數據進行合併壓縮等操做

寫階段詳情:

步驟1利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號partition進行排序,而後按照key進行排序。這樣通過排序後,數據以分區爲單位彙集在一塊兒,且同一分區內全部數據按照key有序。

 

步驟2按照分區編號由小到大依次將每一個分區中的數據寫入任務工做目錄下的臨時文件output/spillN.outN表示當前溢寫次數)中。若是用戶設置了Combiner,則寫入文件以前,對每一個分區中數據進行一次彙集操做。

步驟3將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每一個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小。若是當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index

5Combine階段:當全部數據處理完成後,MapTask對全部臨時文件進行一次合併,以確保最終只會生成一個數據文件。

全部數據處理完後,MapTask會將全部臨時文件合併成一個大文件保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。

進行文件合併過程當中,MapTask以分區爲單位進行合併。對於某個分區,將採用多輪遞歸合併的方式每輪合併io.sort.factor(默認100)個文件,並將產生的文件從新加入待合併列表中,對文件排序後,重複以上過程,直到最終獲得一個大文件。

每一個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。

3.4 Shuffle機制

3.4.1 Shuffle機制

Mapreduce確保每一個reducer的輸入都是按鍵排序的。系統執行排序的過程(即map輸出做爲輸入傳給reducer稱爲shuffle。

 

 

3.4.2 Partition分區

0問題引出:要求將統計結果按照條件輸出到不一樣文件中(分區)。好比:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(分區)

1默認partition分區

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value, int numReduceTasks) {

    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

  }

}

默認分區是根據keyhashCode對reduceTasks個數取模獲得的。用戶無法控制哪一個key存儲到哪一個分區

2自定義Partitioner步驟

1)自定義類繼承PartitionergetPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

 

@Override

public int getPartition(Text key, FlowBean value, int numPartitions) {

 

// 1 獲取電話號碼的前三位

String preNum = key.toString().substring(0, 3);

 

partition = 4;

 

// 2 判斷是哪一個省

if ("136".equals(preNum)) {

partition = 0;

}else if ("137".equals(preNum)) {

partition = 1;

}else if ("138".equals(preNum)) {

partition = 2;

}else if ("139".equals(preNum)) {

partition = 3;

}

return partition;

}

}

(2)job驅動中,設置自定義partitioner

job.setPartitionerClass(CustomPartitioner.class);

(3)自定義partition後,要根據自定義partitioner的邏輯設置相應數量的reduce task

job.setNumReduceTasks(5);

3)注意:

若是reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx

若是1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception

若是reduceTask的數量=1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000

例如假設自定義分區數爲5

1job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件

2job.setNumReduceTasks(2);會報錯

3job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件

4案例實操

詳見7.2.2 需求2:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(Partitioner

詳見7.1.2 需求2:把單詞按照ASCII碼奇偶分區(Partitioner

3.4.3 WritableComparable排序

排序是MapReduce框架中最重要的操做之一。Map Task和Reduce Task均會對數據(按照key進行排序。該操做屬於Hadoop的默認行爲。任何應用程序中的數據均會被排序,而無論邏輯上是否須要。默認排序是按照字典順序排序,且實現該排序的方法是快速排序。

對於Map Task,它會將處理的結果暫時放到一個緩衝區中,當緩衝區使用率達到必定閾值後,再對緩衝區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢後,它會對磁盤上全部文件進行一次合併,以將這些文件合併成一個大的有序文件。

對於Reduce Task,它從每一個Map Task上遠程拷貝相應數據文件,若是文件大小超過必定閾值,則放到磁盤上,不然放到內存中。若是磁盤上文件數目達到必定閾值則進行一次合併以生成一個更大文件若是內存中文件大小或者數目超過必定閾值,則進行一次合併後數據寫到磁盤上。當全部數據拷貝完畢後,Reduce Task統一對內存和磁盤上的全部數據進行一次合併。

每一個階段的默認排序

1)排序的分類:

1部分排序:

MapReduce根據輸入記錄的鍵對數據集排序。保證輸出的每一個文件內部排序。

2全排序:

如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失MapReduce所提供的並行架構。

替代方案:首先建立一系列排好序的文件;其次,串聯這些文件;最後,生成一個全局排序的文件。主要思路是使用一個分區來描述輸出的全局排序。例如能夠爲上述文件建立3個分區,在第一分區中,記錄的單詞首字母a-g第二分區記錄單詞首字母h-n, 分區記錄單詞首字母o-z。

3)輔助排序:GroupingComparator分組)

Mapreduce框架在記錄到達reducer以前按鍵對記錄排序,但鍵所對應的值並無被排序。甚至不一樣的執行輪次中,這些值的排序也不固定,由於它們來自不一樣的map任務且這些map任務在不一樣輪次中完成時間各不相同。通常來講,大多數MapReduce程序會避免讓reduce函數依賴於值的排序。可是,有時也須要經過特定的方法對鍵進行排序和分組等以實現對值的排序。

4)二次排序:

自定義排序過程當中若是compareTo中的判斷條件爲兩個即爲二次排序。

2)自定義排序WritableComparable

1)原理分析

bean對象實現WritableComparable接口重寫compareTo方法,就能夠實現排序

@Override

public int compareTo(FlowBean o) {

// 倒序排列,從大到小

return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

2案例實操

詳見7.2.3 需求3:將統計結果按照總流量倒序排序(排序)

詳見7.2.4 需求4:不一樣省份輸出文件內部排序(部分排序)

3.4.4 GroupingComparator分組(輔助排序

1)對reduce階段的數據根據某一個幾個字段進行分組

2)案例實操

詳見7.3 求出每個訂單中最貴的商品GroupingComparator

3.4.5 Combiner合併

0在分佈式的架構中,分佈式文件系統HDFS,和分佈式運算程序編程框架mapreduce。

HDFS:不怕大文件,怕不少小文件

mapreduce :怕數據傾斜

那麼mapreduce是若是解決多個小文件的問題呢?

mapreduce關於大量小文件的優化策略

1) 默認狀況下,TextInputFormat對任務的切片機制是按照文件規劃切片,無論有多少個小文件,都會是單獨的切片,都會交給一個maptask,這樣,若是有大量的小文件

就會產生大量的maptask,處理效率極端底下

2)優化策略

最好的方法:在數據處理的最前端(預處理、採集),就將小文件合併成大文件,在上傳到HDFS作後續的分析

補救措施:若是已是大量的小文件在HDFS中了,可使用另外一種inputformat來作切片(CombineFileInputformat),它的切片邏輯跟TextInputformat

注:CombineTextInputFormat是CombineFileInputformat的子類

不一樣:

它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個maptask了

//若是不設置InputFormat,它默認的用的是TextInputFormat.class

/*CombineTextInputFormat爲系統自帶的組件類

 * setMinInputSplitSize 中的2048是表示n個小文件之和不能大於2048

 * setMaxInputSplitSize 中的4096是     當知足setMinInputSplitSize中的2048狀況下  在知足n+1個小文件之和不能大於4096

 */

job.setInputFormatClass(CombineTextInputFormat.class);

CombineTextInputFormat.setMinInputSplitSize(job, 2048);

CombineTextInputFormat.setMaxInputSplitSize(job, 4096);

1)輸入數據:準備5小文件

2)實現過程

1不作任何處理,運行需求1wordcount程序,觀察切片個數爲5

2WordcountDriver中增長以下代碼運行程序,並觀察運行的切片個數爲1

// 若是不設置InputFormat,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

CombineTextInputFormat.setMaxInputSplitSize(job, 4*1024*1024);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2*1024*1024);// 2m

 

注:在看number of splits時,和最大值(MaxSplitSize)有關、整體規律就是和低於最大值是一片、高於最大值1.5+,則爲兩片;高於最大值2倍以上則向下取整,好比文件大小65MB,切片最大值爲4MB,那麼切片爲16.整體來講,切片差值不超過1個,不影響總體性能

 

6)自定義Combiner實現步驟:

1)自定義一個combiner繼承Reducer,重寫reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

        // 1 彙總操做

int count = 0;

for(IntWritable v :values){

count = v.get();

}

        // 2 寫出

context.write(key, new IntWritable(count));

}

}

2)在job驅動中設置:  

job.setCombinerClass(WordcountCombiner.class);

3.5 ReduceTask工做機制

1設置ReduceTask並行度(個數)

reducetask的並行度一樣影響整個job的執行併發度和執行效率,但與maptask的併發數由切片數決定不一樣,Reducetask數量的決定是能夠直接手動設置:

//默認值是1,手動設置爲4

job.setNumReduceTasks(4);

2注意

1reducetask=0 ,表示沒有reduce階段,輸出文件個數和map個數一致。

2reducetask默認值就是1,因此輸出文件個數爲一個。

(3)若是數據分佈不均勻,就有可能在reduce階段產生數據傾斜

(4reducetask數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有1reducetask

(5)具體多少reducetask,須要根據集羣性能而定。

(6)若是分區數不是1可是reducetask1是否執行分區過程。答案:不執行分區過程。由於在maptask的源碼中,執行分區的前提是先判斷reduceNum個數是否大於1大於1確定不執行。

3實驗:測試reducetask多少合適。

1)實驗環境:1個master節點,16slave節點:CPU:8GHZ,內存: 2G

2)實驗結論:

1 改變reduce task (數據量1GB)

Map task =16

Reduce task

1

5

10

15

16

20

25

30

45

60

總時間

892

146

110

92

88

100

128

101

145

104

4ReduceTask工做機制

 

1Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,若是其大小超過必定閾值,則寫到磁盤上,不然直接放到內存中。

2Merge階段:在遠程拷貝數據的同時,ReduceTask啓動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多。

3Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行彙集的一組數據。爲了key相同的數據聚在一塊兒,Hadoop採用了基於排序的策略。因爲各個MapTask已經實現對本身的處理結果進行了局部排序,所以,ReduceTask只需對全部數據進行一次歸併排序便可。

4Reduce階段:reduce()函數將計算結果寫到HDFS上。

3.6 OutputFormat數據輸出

3.6.1 OutputFormat接口實現類

 OutputFormatMapReduce輸出的基類,全部實現MapReduce輸出都實現了 OutputFormat接口。下面咱們介紹幾種常見的OutputFormat實現

1)文本輸出TextOutputFormat

        默認的輸出格式是TextOutputFormat,它把每條記錄寫爲文本行。它的鍵和值能夠是任意類型,由於TextOutputFormat調用toString()方法把它們轉換爲字符串。

2SequenceFileOutputFormat

 SequenceFileOutputFormat將它的輸出寫爲一個順序文件。若是輸出須要做爲後續 MapReduce任務的輸入,這即是一種好的輸出格式,由於它的格式緊湊,很容易被壓縮。

3自定義OutputFormat

根據用戶需求,自定義實現輸出。

3.6.2 自定義OutputFormat

爲了實現控制最終文件的輸出路徑,能夠自定義OutputFormat。

要在一個mapreduce程序中根據數據的不一樣輸出兩類結果到不一樣目錄這類靈活的輸出需求能夠經過自定義outputformat來實現

1自定義OutputFormat步驟

1)自定義一個類繼承FileOutputFormat。

2)改寫recordwriter,具體改寫輸出數據的方法write()

2實操案例:

詳見7.5 修改日誌內容及自定義日誌輸出路徑(自定義OutputFormat)。

3.7 Join多種應用

3.7.1 Reduce join

1)原理

Map端的主要工做:爲來自不一樣表(文件)key/value對打標籤以區別不一樣來源的記錄。而後用鏈接字段做爲key,其他部分和新加的標誌做爲value,最後進行輸出。

Reduce端的主要工做:在reduce端以鏈接字段做爲key的分組已經完成,咱們只須要在每個分組當中將那些來源於不一樣文件的記錄(map階段已經打標誌)分開,最後進行合併就ok了。

2)該方法的缺點

這種方式的缺點很明顯就是會形成mapreduce端也就是shuffle階段出現大量的數據傳輸,效率很低。

3)案例實操

詳見7.6.1 需求1reduce端表合併(數據傾斜

3.7.2 Map joinDistributedcache分佈式緩存

1)使用場景:一張表十分小、一張表很大。

2)解決方案

map端緩存多張表,提早處理業務邏輯,這樣增長map端業務,減小reduce端數據的壓力,儘量的減小數據傾斜。

3)具體辦法:採用distributedcache

1)在mappersetup階段,將文件讀取到緩存集合中

2)在驅動函數中加載緩存。

job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運行節點

4)實操案例:

詳見7.6.2需求2map端表合併(Distributedcache

3.8 數據清洗(ETL

1概述

運行核心業務Mapreduce程序以前,每每要先對數據進行清洗,清理掉不符合用戶要求的數據。清理的過程每每只須要運行mapper程序,不須要運行reduce程序。

2)實操案例

詳見7.7 日誌清洗(數據清洗)。

3.9 計數器應用

Hadoop爲每一個做業維護若干內置計數器,以描述多項指標。例如某些計數器記錄已處理的字節數和記錄數,使用戶可監控已處理的輸入數據量和已產生的輸出數據量。

1API

1)採用枚舉的方式統計計數

enum MyCounter{MALFORORMED,NORMAL}

//對枚舉定義的自定義計數器加1

context.getCounter(MyCounter.MALFORORMED).increment(1);

2)採用計數器組、計數器名稱方式統計

context.getCounter("counterGroup", "countera").increment(1);

名和計數器名稱隨便起,但最好有意義。

3)計數結果在程序運行後的控制檯上查看。

2)案例實操

詳見7.7 日誌清洗(數據清洗)。

3.10 MapReduce開發總結

在編寫mapreduce程序時,須要考慮的幾個方面

1)輸入數據接口:InputFormat

   默認使用的實現類是:TextInputFormat

   TextInputFormat的功能邏輯是:一次讀一行文本,而後將該行的起始偏移量做爲key,行內容做爲value返回。

KeyValueTextInputFormat每一行均爲一條記錄,被分隔符分割爲keyvalue。默認分隔符是tab\t)。

NlineInputFormat按照指定的行數N來劃分切片

CombineTextInputFormat能夠把多個小文件合併成一個切片處理提升處理效率。

用戶還能夠自定義InputFormat。

2)邏輯處理接口:Mapper  

   用戶根據業務需求實現其中三個方法map()   setup()   cleanup ()

3Partitioner分區

有默認實現 HashPartitioner,邏輯是根據key哈希值numReduces來返回一個分區號;key.hashCode()&Integer.MAXVALUE % numReduces

若是業務上有特別的需求,能夠自定義分區。

4Comparable排序

當咱們用自定義的對象做爲key來輸出時,就必需要實現WritableComparable接口,重寫其中的compareTo()方法。

部分排序:對最終輸出的一個文件進行內部排序。

排序對全部數據進行排序,一般只有一個Reduce

二次排序:排序的條件有兩個

5Combiner合併

Combiner合併能夠提升程序執行效率,減小io傳輸。可是使用時必須不能影響原有的業務處理結果。

6reduce端分組:Groupingcomparator

reduceTask拿到輸入數據(一個partition的全部數據)後,首先須要對數據進行分組,其分組的默認原則是key相同,而後對每一組kv數據調用一次reduce()方法,而且將這一組kv中的第一個kvkey做爲參數傳給reducekey,將這一組數據的value的迭代器傳給reduce()values參數。

利用上述這個機制,咱們能夠實現一個高效的分組取最大值的邏輯。

自定義一個bean對象用來封裝咱們的數據,而後改寫其compareTo方法產生倒序排序的效果。而後自定義一個Groupingcomparator,將bean對象的分組邏輯改爲按照咱們的業務分組id來分組(好比訂單號)。這樣,咱們要取的最大值就是reduce()方法中傳進來key

7)邏輯處理接口:Reducer

用戶根據業務需求實現其中三個方法reduce()   setup()   cleanup ()

8)輸出數據接口:OutputFormat

默認實現類是TextOutputFormat,功能邏輯是:將每個KV對向目標文本文件中輸出爲一行。

 SequenceFileOutputFormat將它的輸出寫爲一個順序文件。若是輸出須要做爲後續 MapReduce任務的輸入,這即是一種好的輸出格式,由於它的格式緊湊,很容易被壓縮。

用戶還能夠自定義OutputFormat。

Hadoop數據壓縮

4.1 概述

壓縮技術可以有效減小底層存儲系統(HDFS讀寫字節數。壓縮提升了網絡帶寬和磁盤空間的效率。在Hadoop下,尤爲是數據規模很大和工做負載密集的狀況下,使用數據壓縮顯得很是重要。在這種狀況下,I/O操做和網絡數據傳輸要花大量的時間。還有,ShuffleMerge過程一樣也面臨着巨大的I/O壓力

鑑於磁盤I/O網絡帶寬是Hadoop的寶貴資源,數據壓縮對於節省資源、最小化磁盤I/O網絡傳輸很是有幫助。不過儘管壓縮與解壓操做CPU開銷不高,其性能的提高和資源的節省並不是沒有代價。

若是磁盤I/O和網絡帶寬影響了MapReduce做業性能,在任意MapReduce階段啓用壓縮均可以改善端到端處理時間並減小I/O和網絡流量。

壓縮Mapreduce的一種優化策略:經過壓縮編碼對Mapper或者Reducer的輸出進行壓縮,以減小磁盤IO提升MR程序運行速度(但相應增長了cpu運算負擔)。

注意壓縮特性運用得當能提升性能,但運用不當也可能下降性能。

基本原則:

1)運算密集型的job,少用壓縮

2IO密集型的job,多用壓縮

4.2 MR支持的壓縮編碼

壓縮格式

hadoop自帶?

算法

文件擴展名

是否可切分

換成壓縮格式後,原來的程序是否須要修改

DEFAULT

是,直接使用

DEFAULT

.deflate

和文本處理同樣,不須要修改

Gzip

是,直接使用

DEFAULT

.gz

和文本處理同樣,不須要修改

bzip2

是,直接使用

bzip2

.bz2

和文本處理同樣,不須要修改

LZO

否,須要安裝

LZO

.lzo

須要建索引,還須要指定輸入格式

Snappy

否,須要安裝

Snappy

.snappy

和文本處理同樣,不須要修改

爲了支持多種壓縮/解壓縮算法,Hadoop引入了編碼/解碼器,以下表所示

壓縮格式

對應的編碼/解碼器

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

LZO

com.hadoop.compression.lzo.LzopCodec

Snappy

org.apache.hadoop.io.compress.SnappyCodec

壓縮性能的比較

壓縮算法

原始文件大小

壓縮文件大小

壓縮速度

解壓速度

gzip

8.3GB

1.8GB

17.5MB/s

58MB/s

bzip2

8.3GB

1.1GB

2.4MB/s

9.5MB/s

LZO

8.3GB

2.9GB

49.3MB/s

74.6MB/s

http://google.github.io/snappy/

On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.

4.3 壓縮方式選擇

4.3.1 Gzip壓縮

優勢:壓縮率比較高,並且壓縮/解壓速度也比較快;hadoop自己支持,在應用中處理gzip格式的文件就和直接處理文本同樣;大部分linux系統都自帶gzip命令,使用方便。

缺點:不支持split

應用場景:當每一個文件壓縮以後在130M之內的(1個塊大小內),均可以考慮用gzip壓縮格式。例如說一天或者一個小時的日誌壓縮成一個gzip文件,運行mapreduce程序的時候經過多個gzip文件達到併發。hive程序,streaming程序,和java寫的mapreduce程序徹底和文本處理同樣,壓縮以後原來的程序不須要作任何修改。

4.3.2 Bzip2壓縮

優勢:支持split;具備很高的壓縮率,比gzip壓縮率都高;hadoop自己支持,但不支持native(javac互操做的API接口);在linux系統下自帶bzip2命令,使用方便。

缺點:壓縮/解壓速度慢;不支持native

應用場景:適合對速度要求不高,但須要較高的壓縮率的時候,能夠做爲mapreduce做業的輸出格式;或者輸出以後的數據比較大,處理以後的數據須要壓縮存檔減小磁盤空間而且之後數據用得比較少的狀況;或者對單個很大的文本文件想壓縮減小存儲空間,同時又須要支持split,並且兼容以前的應用程序(即應用程序不須要修改)的狀況

4.3.3 Lzo壓縮

優勢:壓縮/解壓速度也比較快,合理的壓縮率;支持split,是hadoop中最流行的壓縮格式;能夠在linux系統下安裝lzop命令,使用方便。

缺點:壓縮率比gzip要低一些;hadoop自己不支持,須要安裝;在應用中對lzo格式的文件須要作一些特殊處理(爲了支持split須要建索引,還須要指定inputformatlzo格式)。

應用場景:一個很大的文本文件,壓縮以後還大於200M以上的能夠考慮,並且單個文件越大,lzo優勢越越明顯。

4.3.4 Snappy壓縮

優勢:高速壓縮速度和合理的壓縮率。

缺點:不支持split;壓縮率比gzip要低;hadoop自己不支持,須要安裝; 

應用場景:當Mapreduce做業的Map輸出的數據比較大的時候,做爲MapReduce的中間數據的壓縮格式;或者做爲一個Mapreduce做業的輸出和另一個Mapreduce做業的輸入。

4.4 壓縮位置選擇

壓縮能夠在MapReduce做用的任意階段啓用。

4.5 壓縮配置參數

要在Hadoop中啓用壓縮,能夠配置以下參數:

參數

默認值

階段

建議

io.compression.codecs   

(在core-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec

 

輸入壓縮

Hadoop使用文件擴展名判斷是否支持某種編解碼器

mapreduce.map.output.compress(在mapred-site.xml中配置)

false

mapper輸出

這個參數設爲true啓用壓縮

mapreduce.map.output.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

mapper輸出

使用LZOsnappy編解碼器在此階段壓縮數據

mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)

false

reducer輸出

這個參數設爲true啓用壓縮

mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress. DefaultCodec

reducer輸出

使用標準工具或者編解碼器,如gzipbzip2

mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)

RECORD

reducer輸出

SequenceFile輸出使用的壓縮類型:NONEBLOCK

4.6 壓縮實戰

壓縮案例詳見7.10 壓縮/解壓縮

Yarn

5.1 Hadoop1.xHadoop2.x架構區別

Hadoop1.x時代,Hadoop中的MapReduce同時處理業務邏輯運算和資源的調度,耦合性較大。

Hadoop2.x時代增長Yarn。Yarn只負責資源的調度,MapReduce只負責運算

5.2 Yarn概述

Yarn是一個資源調度平臺,負責爲運算程序提供服務器運算資源,至關於一個分佈式的操做系統平臺,而MapReduce等運算程序則至關於運行於操做系統之上的應用程序。

5.3 Yarn基本架構

  YARN主要由ResourceManagerNodeManagerApplicationMasterContainer等組件構成。

 

5.4 Yarn工做機制

1)Yarn運行機制

 

2)工做機制詳解

(0)Mr程序提交到客戶端所在的節點。

(1)YarnrunnerResourcemanager申請一個Application

(2rm將該應用程序的資源路徑返回給yarnrunner。

(3)該程序將運行所需資源提交到HDFS

(4)程序資源提交完畢後,申請運行mrAppMaster。

(5RM將用戶的請求初始化成一個task。

(6)其中一個NodeManager領取task任務。

(7)該NodeManager建立容器Container併產生MRAppmaster。

(8ContainerHDFS上拷貝資源到本地

(9)MRAppmaster向RM 申請運行maptask資源。

10RM運行maptask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務建立容器。

11MR向兩個接收到任務的NodeManager發送程序啓動腳本這兩個NodeManager分別啓動maptask,maptask對數據分區排序。

12MrAppMaster等待全部maptask運行完畢後,向RM申請容器,運行reduce task

13reduce taskmaptask獲取相應分區的數據。

14)程序運行完畢後,MR會向RM申請註銷本身。

5.5 做業提交全過程

做業提交全過程詳解

1做業提交

0client調用job.waitForCompletion方法,向整個集羣提交MapReduce做業。

1步:clientRM申請一個做業id

2步:RMclient返回該job資源的提交路徑和做業id

3client提交jar包、切片信息和配置文件到指定的資源提交路徑。

4步:client提交完資源後,向RM申請運行MrAppMaster

2做業初始化

5步:RM收到client的請求後,將job添加到容量調度器中。

6一個空閒的NM領取到該job

7步:NM建立Container併產生MRAppmaster

8:下載client提交的資源到本地。

3任務分配

9MrAppMasterRM申請運行多個maptask任務資源。

10RM運行maptask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務建立容器。

4任務運行

11MR向兩個接收到任務的NodeManager發送程序啓動腳本這兩個NodeManager分別啓動maptask,maptask對數據分區排序。

12MrAppMaster等待全部maptask運行完畢後,向RM申請容器,運行reduce task

13reduce taskmaptask獲取相應分區的數據。

14程序運行完畢後,MR會向RM申請註銷本身。

5進度和狀態更新

YARN中的任務將其進度和狀態(包括counter)返回給應用管理器, 客戶端每秒(經過mapreduce.client.progressmonitor.pollinterval設置)嚮應用管理器請求進度更新, 展現給用戶。

6做業完成

除了嚮應用管理器請求做業進度外, 客戶端每5分鐘都會經過調用waitForCompletion()來檢查做業是否完成時間間隔能夠經過mapreduce.client.completion.pollinterval來設置做業完成以後, 應用管理器和container會清理工做狀態做業的信息會被做業歷史服務器存儲以備以後用戶覈查

5.6 資源調度器

目前,Hadoop做業調度器主要有三種:FIFOCapacity SchedulerFair SchedulerHadoop2.7.2默認的資源調度器是Capacity Scheduler。

具體設置詳見:yarn-default.xml文件

<property>

    <description>The class to use as the resource scheduler.</description>

    <name>yarn.resourcemanager.scheduler.class</name>

<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>

</property>

1) 先進先出調度器(FIFO

 

優勢:調度算法簡單,JobTracker工做負擔輕。

缺點:忽略了不一樣做業的需求差別。例如若是相似對海量數據進行統計分析的做業長期佔據計算資源,那麼在其後提交的交互型做業有可能遲遲得不處處理,從而影響到用戶的體驗。

2) 容量調度器(Capacity Scheduler===>Yahoo開發

 

1.多隊列支持,每一個隊列採用FIFO

2.爲了防止同一個用戶的做業獨佔隊列中的資源,該調度器會對同一個用戶提交多的做業所佔資源量進行限定

3.首先,計算每一個隊列中正在運行的任務數與其應該分得的計算資源之間的比值,選擇一個該比值最小的隊列

4.其次,根據做業的優先級和提交時間順序,同時考慮用戶資源量限制和內存限制對隊列內任務排序

5.三個隊列同時按照任務的前後順序依次執行,好比,job1job21job31分別排在隊列最前面,是最早運行,也是同時運行

 

該調度默認狀況下不支持優先級,可是能夠在配置文件中開啓此選項,若是支持優先級,調度算法就是帶有優先級的FIFO

不支持優先級搶佔,一旦一個做業開始執行,在執行完以前它的資源不會被高優先級做業所搶佔。

對隊列中同一用戶提交的做業可以得到的資源百分比進行了限制以使同屬於一用戶的做業不能出現獨佔資源的狀況。

 

3)公平調度器(Fair Scheduler===>Facebook開發

 

1.支持多隊列多用戶,每一個隊列中的資源量能夠配置,同一個隊列中的做業公平共享隊列中全部資源

2.好比有三個隊列ABC.每一個隊列中的job按照優先級分配資源,優先級越高分配的資源越多,可是每一個job都分配到資源以確保公平。在資源有限的狀況下,每一個job理想狀況下,得到的計算資源與實際得到的計算資源存在一種差距,這個差距叫作缺額。同一個隊列,job的資源缺額越大,越先得到的資源優先執行,做業是按照缺額的高低來前後執行的,並且能夠看到上圖有多個做業同時運行

 

5.7 任務推測執行

推測執行(Speculative Execution)是指在集羣環境下運行MapReduce,多是程序Bug,負載不均或者其餘的一些問題,致使在一個JOB下的多個TASK速度不一致,好比有的任務已經完成,可是有些任務可能只跑了10%,根據木桶原理,這些任務將成爲整個JOB的短板,若是集羣啓動了推測執行,這時爲了最大限度的提升短板,Hadoop會爲該task啓動備份任務,讓speculative task與原始task同時處理一份數據,哪一個先運行完,則將誰的結果做爲最終結果,而且在運行完成後Kill掉另一個任務。

 

1)做業完成時間取決於最慢的任務完成時間

一個做業由若干個Map任務和Reduce任務構成。因硬件老化、軟件Bug等,某些任務可能運行很是慢。

典型案例:系統中有99%Map任務都完成了,只有少數幾個Map總是進度很慢,完不成,怎麼辦?

2)推測執行機制:

發現拖後腿的任務,好比某個任務運行速度遠慢於任務平均速度。爲拖後腿任務啓動一個備份任務,同時運行。誰先運行完,則採用誰的結果。

3)執行推測任務的前提條件

1每一個task只能有一個備份任務

2當前job已完成的task必須不小於0.055%

3開啓推測執行參數設置,mapred-site.xml文件中默認是打開的。

<property>

  <name>mapreduce.map.speculative</name>

  <value>true</value>

  <description>If true, then multiple instances of some map tasks                may be executed in parallel.</description>

</property>

 

<property>

  <name>mapreduce.reduce.speculative</name>

  <value>true</value>

  <description>If true, then multiple instances of some reduce tasks

               may be executed in parallel.</description>

</property>

4)不能啓用推測執行機制狀況

   1)任務間存在嚴重的負載傾斜;

   2)特殊任務,好比任務向數據庫中寫數據。

Hadoop企業優化

6.1 MapReduce 跑的慢的緣由

Mapreduce 程序效率的瓶頸在於兩點:

1)計算機性能

CPU、內存、磁盤健康、網絡

2I/O 操做優化

1)數據傾斜

2mapreduce數設置不合理

3map運行時間太長,致使reduce等待太久

4)小文件過多

5大量的不可分塊的超大文件

6spill次數過多

7merge次數過多等。

6.2 MapReduce優化方法

MapReduce優化方法主要從六個方面考慮:數據輸入、Map階段Reduce階段、IO傳輸、數據傾斜問題和經常使用的調優參數。

6.2.1 數據輸入

1)合併小文件:在執行mr任務前將小文件進行合併,大量的小文件會產生大量的map任務,增大map任務裝載次數,而任務的裝載比較耗時,從而致使mr運行較慢。

2)採用CombineTextInputFormat來做爲輸入,解決輸入端大量小文件場景。

6.2.2 Map階段

1)減小溢寫spill次數:經過調整io.sort.mbsort.spill.percent參數值,增大觸發spill的內存上限,減小spill次數,從而減小磁盤IO

2)減小合併merge次數:經過調整io.sort.factor參數,增大merge的文件數目,減小merge的次數,從而縮短mr處理時間。

3)在map以後,不影響業務邏輯前提下,先進行combine處理,減小 I/O

6.2.3 Reduce階段

1)合理設置mapreduce:兩個都不能設置太少,也不能設置太多。太少,會致使task等待,延長處理時間;太多,會致使 mapreduce任務間競爭資源,形成處理超時等錯誤。

2)設置mapreduce共存:調整slowstart.completedmaps參數,使map運行到必定程度後,reduce也開始運行,減小reduce的等待時間。

3規避使用reduce由於reduce在用於鏈接數據集的時候將會產生大量的網絡消耗。

4)合理設置reduce端的buffer默認狀況下,數據達到一個閾值的時候,buffer中的數據就會寫入磁盤,而後reduce會從磁盤中得到全部的數據。也就是說,bufferreduce是沒有直接關聯的,中間多個一個寫磁盤->讀磁盤的過程,既然有這個弊端,那麼就能夠經過參數來配置,使得buffer中的一部分數據能夠直接輸送到reduce,從而減小IO開銷:mapred.job.reduce.input.buffer.percent,默認爲0.0。當值大於0的時候,會保留指定比例的內存讀buffer中的數據直接拿給reduce使用。這樣一來,設置buffer須要內存,讀取數據須要內存,reduce計算也要內存,因此要根據做業的運行狀況進行調整。

6.2.4 IO傳輸

1採用數據壓縮的方式,減小網絡IO的時間。安裝SnappyLZO壓縮編碼器

2使用SequenceFile二進制文件

6.2.5 數據傾斜問題

1)數據傾斜現象

數據頻率傾斜——某一個區域的數據量要遠遠大於其餘區域。

數據大小傾斜——部分記錄的大小遠遠大於平均值。

2)如何收集傾斜數據

reduce方法中加入記錄map輸出鍵的詳細狀況的功能

public static final String MAX_VALUES = "skew.maxvalues";

private int maxValueThreshold;

 

@Override

public void configure(JobConf job) {

     maxValueThreshold = job.getInt(MAX_VALUES, 100);

}

@Override

public void reduce(Text key, Iterator<Text> values,

                     OutputCollector<Text, Text> output,

                     Reporter reporter) throws IOException {

     int i = 0;

     while (values.hasNext()) {

         values.next();

         i++;

     }

 

     if (++i > maxValueThreshold) {

         log.info("Received " + i + " values for key " + key);

     }

}

3)減小數據傾斜方法

方法1:抽樣和範圍分區

能夠經過對原始數據進行抽樣獲得的結果集來預設分區邊界值。

方法2:自定義分區

基於輸出鍵的背景知識進行自定義分區。例如,若是map輸出鍵的單詞來源於一本書。且其中某幾個專業詞彙較多。那麼就能夠自定義分區將這這些專業詞彙發送給固定的一部分reduce實例。而將其餘的都發送給剩餘的reduce實例。

方法3Combine

使用Combine能夠大量地減少數據傾斜。在可能的狀況下,combine的目的就是聚合並精簡數據。

方法4採用Map Join儘可能避免Reduce Join

6.2.6 經常使用的調優參數

1資源相關參數

(1)如下參數是在用戶本身的mr應用程序中配置就能夠生效(mapred-default.xml)

配置參數

參數說明

mapreduce.map.memory.mb

一個Map Task可以使用的資源上限(單位:MB),默認爲1024。若是Map Task實際使用的資源量超過該值,則會被強制殺死。

mapreduce.reduce.memory.mb

一個Reduce Task可以使用的資源上限(單位:MB),默認爲1024。若是Reduce Task實際使用的資源量超過該值,則會被強制殺死。

mapreduce.map.cpu.vcores

每一個Map task可以使用的最多cpu core數目,默認值: 1

mapreduce.reduce.cpu.vcores

每一個Reduce task可以使用的最多cpu core數目,默認值: 1

mapreduce.reduce.shuffle.parallelcopies

每一個reducemap中拿數據的並行數。默認值是5

mapreduce.reduce.shuffle.merge.percent

buffer中的數據達到多少比例開始寫入磁盤。默認值0.66

mapreduce.reduce.shuffle.input.buffer.percent

buffer大小佔reduce可用內存的比例。默認值0.7

mapreduce.reduce.input.buffer.percent

指定多少比例的內存用來存放buffer中的數據,默認值是0.0

(2)應該在yarn啓動以前就配置在服務器的配置文件中才能生效(yarn-default.xml)

配置參數

參數說明

yarn.scheduler.minimum-allocation-mb   1024

給應用程序container分配的最小內存

yarn.scheduler.maximum-allocation-mb   8192

給應用程序container分配的最大內存

yarn.scheduler.minimum-allocation-vcores 1

每一個container申請的最小CPU核數

yarn.scheduler.maximum-allocation-vcores 32

每一個container申請的最大CPU核數

yarn.nodemanager.resource.memory-mb   8192

給containers分配的最大物理內存

(3shuffle性能優化的關鍵參數,應在yarn啓動以前就配置好(mapred-default.xml)

配置參數

參數說明

mapreduce.task.io.sort.mb   100

shuffle的環形緩衝區大小,默認100m

mapreduce.map.sort.spill.percent   0.8

環形緩衝區溢出的閾值,默認80%

2)容錯相關參數(mapreduce性能優化)

配置參數

參數說明

mapreduce.map.maxattempts

每一個Map Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4

mapreduce.reduce.maxattempts

每一個Reduce Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4

mapreduce.task.timeout

Task超時時間,常常須要設置的一個參數,該參數表達的意思爲:若是一個task在必定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認爲該task處於block狀態,多是卡住了,也許永遠會卡主,爲了防止由於用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是600000。若是你的程序對每條輸入數據的處理時間過長(好比會訪問數據庫,經過網絡拉取數據等),建議將該參數調大,該參數太小常出現的錯誤提示是「AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.」。

6.3 HDFS文件優化方法

6.3.1 HDFS小文件弊端

HDFS上每一個文件都要在namenode上創建一個索引,這個索引的大小約爲150byte,這樣當小文件比較多的時候,就會產生不少的索引文件,一方面會大量佔用namenode的內存空間,另外一方面就是索引文件過大是的索引速度變慢。

6.3.2 解決方案

1Hadoop Archive:

 是一個高效地將小文件放入HDFS塊中的文件存檔工具,它可以將多個小文件打包成一個HAR文件,這樣減小了namenode的內存使用。

2Sequence file

 sequence file由一系列的二進制key/value組成,若是key文件名,value爲文件內容,則能夠將大批小文件合併成一個大文件。

3CombineFileInputFormat

  CombineFileInputFormat是一種新的inputformat,用於將多個文件合併成一個單獨的split,另外,它會考慮數據的存儲位置。

4)開啓JVM重用

對於大量小文件Job能夠開啓JVM重用減小45%運行時間

JVM重用理解:一個map運行一個jvm,重用的話,在一個mapjvm上運行完畢後,jvm繼續運行其餘map

具體設置mapreduce.job.jvm.numtasks值10-20之間

MapReduce實戰

7.1 WordCount案例

7.1.1 需求1統計一堆文件中單詞出現的個數

0)需求:在一堆給定的文本文件中統計輸出每個單詞出現的總次數

1)數據準備

 

2)分析

按照mapreduce編程規範,分別編寫MapperReducerDriver

 

 

3)編寫程序

1)編寫mapper

package com.itstar.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

 

Text k = new Text();

IntWritable v = new IntWritable(1);

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

 

// 1 獲取一行

String line = value.toString();

 

// 2 切割

String[] words = line.split(" ");

 

// 3 輸出

for (String word : words) {

 

k.set(word);

context.write(k, v);

}

}

}

2)編寫reducer

package com.itstar.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

 

@Override

protected void reduce(Text key, Iterable<IntWritable> value,

Context context) throws IOException, InterruptedException {

 

// 1 累加求和

int sum = 0;

for (IntWritable count : value) {

sum += count.get();

}

 

// 2 輸出

context.write(key, new IntWritable(sum));

}

}

3)編寫驅動類

package com.itstar.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class WordcountDriver {

 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

 

        String[] args=new String{「輸入路徑」,」輸出路徑」};

 

// 1 獲取配置信息

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 2 設置jar加載路徑

job.setJarByClass(WordcountDriver.class);

 

// 3 設置mapReduce

job.setMapperClass(WordcountMapper.class);

job.setReducerClass(WordcountReducer.class);

 

// 4 設置map輸出

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

 

// 5 設置Reduce輸出

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

 

// 6 設置輸入和輸出路徑

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 7 提交

boolean result = job.waitForCompletion(true);

 

System.exit(result ? 0 : 1);

}

}

4)集羣上測試

(1)將程序打成jar包,而後拷貝hadoop集羣中

(2)啓動hadoop集羣

(3)執行wordcount程序

[itstar@hadoop102 software]$ hadoop jar  wc.jar com.itstar.wordcount.WordcountDriver /user/itstar/input /user/itstar/output1

5)本地測試

1)在windows環境上配置HADOOP_HOME環境變量。

2)在idea上運行程序

3)注意:若是idea打印不出日誌,在控制檯上只顯示

1.log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).  

2.log4j:WARN Please initialize the log4j system properly.  

3.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

須要在項目的src目錄下,新建一個文件,命名爲「log4j.properties」在文件中填入

log4j.rootLogger=INFO, stdout  

log4j.appender.stdout=org.apache.log4j.ConsoleAppender  

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  

log4j.appender.logfile=org.apache.log4j.FileAppender  

log4j.appender.logfile.File=target/spring.log  

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n  

7.1.2 需求2:把單詞按照ASCII碼奇偶分區(Partitioner

0分析

1自定義分區

package com.itstar.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

 

public class WordCountPartitioner extends Partitioner<Text, IntWritable>{

 

@Override

public int getPartition(Text key, IntWritable value, int numPartitions) {

 

// 1 獲取單詞key  

String firWord = key.toString().substring(0, 1);

//String -> int

        int result = Integer.valueOf(firWord);

 

// 2 根據奇數偶數分區

if (result % 2 == 0) {

return 0;

}else {

return 1;

}

}

}

2)在驅動中配置加載分區,設置reducetask個數

job.setPartitionerClass(WordCountPartitioner.class);

job.setNumReduceTasks(2);

7.1.3 需求3:對每個maptask的輸出局部彙總(Combiner

0)需求:統計過程對每個maptask的輸出進行局部彙總,以減少網絡傳輸量即採用Combiner功能。

 

1)數據準備

 

方案一

1增長一個WordcountCombiner類繼承Reducer

package com.itstar.mr.combiner;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

 

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

        // 1 彙總

int count = 0;

for(IntWritable v :values){

count += v.get();

}

// 2 寫出

context.write(key, new IntWritable(count));

}

}

2WordcountDriver驅動類中指定combiner

// 9 指定須要使用combiner,以及用哪一個類做爲combiner的邏輯

job.setCombinerClass(WordcountCombiner.class);

方案

1)將WordcountReducer做爲combinerWordcountDriver驅動類中指定

// 指定須要使用combiner,以及用哪一個類做爲combiner的邏輯

job.setCombinerClass(WordcountReducer.class);

 

運行程序

 

 

7.1.4 需求4:大量小文件切片優化(CombineTextInputFormat

0在分佈式的架構中,分佈式文件系統HDFS,和分佈式運算程序編程框架mapreduce。

HDFS:不怕大文件,怕不少小文件

mapreduce :怕數據傾斜

那麼mapreduce是若是解決多個小文件的問題呢?

mapreduce關於大量小文件的優化策略

1) 默認狀況下,TextInputFormat對任務的切片機制是按照文件規劃切片,無論有多少個小文件,都會是單獨的切片,都會交給一個maptask,這樣,若是有大量的小文件

就會產生大量的maptask,處理效率極端底下

2)優化策略

最好的方法:在數據處理的最前端(預處理、採集),就將小文件合併成大文件,在上傳到HDFS作後續的分析

補救措施:若是已是大量的小文件在HDFS中了,可使用另外一種inputformat來作切片(CombineFileInputformat),它的切片邏輯跟TextInputformat

注:combineTextInputFormat是CombineFileInputformat的子類

不一樣:

它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個maptask了

//若是不設置InputFormat,它默認的用的是TextInputFormat.class

/*CombineTextInputFormat爲系統自帶的組件類

 * setMinInputSplitSize 中的2048是表示n個小文件之和不能大於2048

 * setMaxInputSplitSize 中的4096是     當知足setMinInputSplitSize中的2048狀況下  在知足n+1個小文件之和不能大於4096

 */

job.setInputFormatClass(CombineTextInputFormat.class);

CombineTextInputFormat.setMinInputSplitSize(job, 2048);

CombineTextInputFormat.setMaxInputSplitSize(job, 4096);

1)輸入數據:準備5小文件

2)實現過程

1不作任何處理,運行需求1wordcount程序,觀察切片個數爲5

 

2WordcountDriver中增長以下代碼運行程序,並觀察運行的切片個數爲1

// 若是不設置InputFormat,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

 

注:

文件大小 <  MinSplit  < MaxSplit

number of splits:1

 

 

MinSplit  <  文件大小 < MaxSplit

number of splits:1

 

 

MaxSplit < 文件大小 < 2*MaxSplit

number of splits:2

 

 

2 * MaxSplit < 文件大小

number of splits:3

 

 

測試大小

最大MB

文件大小和最大值倍數

Splits

4.97MB

3MB

1.65

2

4.1MB

3MB

1.36

1

6.51

3MB

2.17

3

 

 

7.2 流量彙總案例

7.2.1 需求1:統計手機號耗費的總上行流量、下行流量、總流量(序列化)

1需求:

統計每個手機號耗費的總上行流量、下行流量、總流量

2)數據準備

 

輸入數據格式:

數據格式:時間戳、電話號碼、基站的物理地址、訪問網址的ip、網站域名、數據包、接包數、上行/傳流量、下行/載流量、響應碼

 

輸出數據格式

1356·0436666 1116       954 2070

手機號碼 上行流量        下行流量 總流量

3)分析

基本思路:

Map階段:

1)讀取一行數據,切分字段

2)抽取手機號、上行流量、下行流量

3)以手機key,bean對象爲value輸出context.write(手機號,bean);

Reduce階段:

1)累加上行流量和下行流量獲得總流量。

(2)實現自定義的bean來封裝流量信息,並將bean做爲map輸出的key來傳輸

(3MR程序在處理數據的過程當中會對數據排序(map輸出的kv對傳輸到reduce以前,會排序),排序的依據是map輸出的key

因此,咱們若是要實現本身須要的排序規則,則能夠考慮將排序因素放到key中,key實現接口:WritableComparable

而後重寫keycompareTo方法。

4)編寫mapreduce程序

1)編寫流量統計的bean對象

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

 

// 1 實現writable接口

public class FlowBean implements Writable{

 

private long upFlow ;

private long downFlow;

private long sumFlow;

 

//2  反序列化時,須要反射調用空參構造函數,因此必須有

public FlowBean() {

super();

}

 

public FlowBean(long upFlow, long downFlow) {

super();

this.upFlow = upFlow;

this.downFlow = downFlow;

this.sumFlow = upFlow + downFlow;

}

 

//3  寫序列化方法

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);

out.writeLong(downFlow);

out.writeLong(sumFlow);

}

 

//4 反序列化方法

//5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一致

@Override

public void readFields(DataInput in) throws IOException {

this.upFlow  = in.readLong();

this.downFlow = in.readLong();

this.sumFlow = in.readLong();

}

 

// 6 編寫toString方法,方便後續打印到文本

@Override

public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow;

}

 

public long getUpFlow() {

return upFlow;

}

 

public void setUpFlow(long upFlow) {

this.upFlow = upFlow;

}

 

public long getDownFlow() {

return downFlow;

}

 

public void setDownFlow(long downFlow) {

this.downFlow = downFlow;

}

 

public long getSumFlow() {

return sumFlow;

}

 

public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;

}

 

}

2)編寫mapper

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

 

FlowBean v = new FlowBean();

Text k = new Text();

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

 

// 1 獲取一行

String line = value.toString();

 

// 2 切割字段

String[] fields = line.split("\t");

 

// 3 封裝對象

// 取出手機號碼

String phoneNum = fields[1];

// 取出上行流量和下行流量

long upFlow = Long.parseLong(fields[fields.length - 3]);

long downFlow = Long.parseLong(fields[fields.length - 2]);

 

v.set(downFlow, upFlow);

 

// 4 寫出

context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow));

}

}

(3)編寫reducer

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

 

@Override

protected void reduce(Text key, Iterable<FlowBean> values, Context context)

throws IOException, InterruptedException {

 

long sum_upFlow = 0;

long sum_downFlow = 0;

 

// 1 遍歷所用bean,將其中的上行流量,下行流量分別累加

for (FlowBean flowBean : values) {

sum_upFlow += flowBean.getSumFlow();

sum_downFlow += flowBean.getDownFlow();

}

 

// 2 封裝對象

FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);

 

// 3 寫出

context.write(key, resultBean);

}

}

(4)編寫驅動

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FlowsumDriver {

 

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

 

// 1 獲取配置信息,或者job對象實例

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 6 指定本程序的jar包所在的本地路徑

job.setJarByClass(FlowsumDriver.class);

 

// 2 指定本業務job要使用的mapper/Reducer業務類

job.setMapperClass(FlowCountMapper.class);

job.setReducerClass(FlowCountReducer.class);

 

// 3 指定mapper輸出數據的kv類型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

 

// 4 指定最終輸出的數據的kv類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

 

// 5 指定job的輸入原始文件所在目錄

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

7.2.2 需求2:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(Partitioner

0)需求將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(分區)

1數據準備

 

2分析

1Mapreduce中會將map輸出的kv對,按照相同key分組,而後分發給不一樣的reducetask。默認的分發規則爲:根據keyhashcode%reducetask數來分發

2)若是要按照咱們本身的需求進行分組,則須要改寫數據分發(分組)組件Partitioner

自定義一個CustomPartitioner繼承抽象類:Partitioner

3)在job驅動中,設置自定義partitionerjob.setPartitionerClass(CustomPartitioner.class)

3)在需求1基礎上,增長一個分區類

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

 

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

 

@Override

public int getPartition(Text key, FlowBean value, int numPartitions) {

// 1 獲取電話號碼的前三位

String preNum = key.toString().substring(0, 3);

//注:若是設置的分區數小於下面的分區數,如3、則最後一個分區混數據分區

        //注:如何設置的分區數大於下面的分區數,如5,則報錯

int partition = 4;

 

// 2 判斷是哪一個省

if ("136".equals(preNum)) {

partition = 0;

}else if ("137".equals(preNum)) {

partition = 1;

}else if ("138".equals(preNum)) {

partition = 2;

}else if ("139".equals(preNum)) {

partition = 3;

}

 

return partition;

}

}

2)在驅動函數中增長自定義數據分區設置和reduce task設置

package com.itstar.mapreduce.flowsum;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FlowsumDriver {

 

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

 

// 1 獲取配置信息,或者job對象實例

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 6 指定本程序的jar包所在的本地路徑

job.setJarByClass(FlowsumDriver.class);

 

// 2 指定本業務job要使用的mapper/Reducer業務類

job.setMapperClass(FlowCountMapper.class);

job.setReducerClass(FlowCountReducer.class);

 

// 3 指定mapper輸出數據的kv類型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

 

// 4 指定最終輸出的數據的kv類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

 

// 8 指定自定義數據分區

job.setPartitionerClass(ProvincePartitioner.class);

// 9 同時指定相應數量的reduce task

job.setNumReduceTasks(5);

 

// 5 指定job的輸入原始文件所在目錄

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

7.2.3 需求3:將統計結果按照總流量倒序排序(全排序)

0)需求

根據需求1產生的結果再次對總流量進行排序。

1)數據準備

 

2)分析

1)把程序分兩步走,第一步正常統計總流量第二步再把結果進行排序

2context.write(總流量,手機號)

3FlowBean實現WritableComparable接口重寫compareTo方法

@Override

public int compareTo(FlowBean o) {

// 倒序排列,從大到小

return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

3代碼實現

1FlowBean對象在在需求1基礎上增長了比較功能

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

 

public class FlowBean implements WritableComparable<FlowBean> {

 

private long upFlow;

private long downFlow;

private long sumFlow;

 

// 反序列化時,須要反射調用空參構造函數,因此必須有

public FlowBean() {

super();

}

 

public FlowBean(long upFlow, long downFlow) {

super();

this.upFlow = upFlow;

this.downFlow = downFlow;

this.sumFlow = upFlow + downFlow;

}

 

public void set(long upFlow, long downFlow) {

this.upFlow = upFlow;

this.downFlow = downFlow;

this.sumFlow = upFlow + downFlow;

}

 

public long getSumFlow() {

return sumFlow;

}

 

public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;

}

 

public long getUpFlow() {

return upFlow;

}

 

public void setUpFlow(long upFlow) {

this.upFlow = upFlow;

}

 

public long getDownFlow() {

return downFlow;

}

 

public void setDownFlow(long downFlow) {

this.downFlow = downFlow;

}

 

/**

 * 序列化方法

 * @param out

 * @throws IOException

 */

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);

out.writeLong(downFlow);

out.writeLong(sumFlow);

}

 

/**

 * 反序列化方法 注意反序列化的順序和序列化的順序徹底一致

 * @param in

 * @throws IOException

 */

@Override

public void readFields(DataInput in) throws IOException {

upFlow = in.readLong();

downFlow = in.readLong();

sumFlow = in.readLong();

}

 

@Override

public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow;

}

 

@Override

public int compareTo(FlowBean o) {

// 倒序排列,從大到小

return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

}

2)編寫mapper

package com.itstar.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

FlowBean k= new FlowBean();

Text v = new Text();

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

 

// 1 獲取一行

String line = value.toString();

 

// 2 截取

String[] fields = line.split("\t");

 

// 3 封裝對象

String phoneNbr = fields[0];

long upFlow = Long.parseLong(fields[1]);

long downFlow = Long.parseLong(fields[2]);

 

k.set(upFlow, downFlow);

v.set(phoneNbr);

 

// 4 輸出

context.write(k, v);

}

}

(3)編寫reducer

package com.itstar.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{

 

@Override

protected void reduce(FlowBean key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

 

// 循環輸出,避免總流量相同狀況

for (Text text : values) {

context.write(text, key);

}

}

}

(4)編寫driver

package com.itstar.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FlowCountSortDriver {

 

public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

 

// 1 獲取配置信息,或者job對象實例

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 6 指定本程序的jar包所在的本地路徑

job.setJarByClass(FlowCountSortDriver.class);

 

// 2 指定本業務job要使用的mapper/Reducer業務類

job.setMapperClass(FlowCountSortMapper.class);

job.setReducerClass(FlowCountSortReducer.class);

 

// 3 指定mapper輸出數據的kv類型

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(Text.class);

 

// 4 指定最終輸出的數據的kv類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

 

// 5 指定job的輸入原始文件所在目錄

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

 

7.2.4 需求4:不一樣省份輸出文件內部排序(部分排序)

1)需求

要求每一個省份手機號輸出的文件中按照總流量內部排序。

2)分析:

基於需求3增長自定義分區類便可。

3)案例實操

1)增長自定義分區類

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

 

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

 

@Override

public int getPartition(FlowBean key, Text value, int numPartitions) {

 

// 1 獲取手機號碼前三位

String preNum = value.toString().substring(0, 3);

 

int partition = 4;

 

// 2 根據手機號歸屬地設置分區

if ("136".equals(preNum)) {

partition = 0;

}else if ("137".equals(preNum)) {

partition = 1;

}else if ("138".equals(preNum)) {

partition = 2;

}else if ("139".equals(preNum)) {

partition = 3;

}

 

return partition;

}

}

(2)在驅動類中添加分區類

// 加載自定義分區類

job.setPartitionerClass(FlowSortPartitioner.class);

// 設置Reducetask個數

job.setNumReduceTasks(5);

7.3 輔助排序和二次排序案例(GroupingComparator

1)需求

有以下訂單數據

訂單id

商品id

成交金額

0000001

Pdt_01

222.8

0000001

Pdt_06

25.8

0000002

Pdt_03

522.8

0000002

Pdt_04

122.4

0000002

Pdt_05

722.4

0000003

Pdt_01

222.8

0000003

Pdt_02

33.8

如今須要求出每個訂單中最貴的商品。

2輸入數據

 

輸出數據預期:

    

3)分析

1)利用「訂單id和成交金額」做爲key,能夠將map階段讀取到的全部訂單數據按照id分區,按照金額排序,發送到reduce

2)在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,而後取第一個便是最大值。

 

4)代碼實現

1)定義訂單信息OrderBean

import lombok.AllArgsConstructor;

import lombok.Getter;

import lombok.NoArgsConstructor;

import lombok.Setter;

import org.apache.hadoop.io.WritableComparable;

 

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

 

@Getter

@Setter

@AllArgsConstructor

@NoArgsConstructor

public class OrderBean implements WritableComparable<OrderBean> {

    // 訂單id

    private int order_id;

    // 價格

    private double price;

 

    @Override

    public String toString() {

        return order_id + "\t" + price;

    }

 

    @Override

    public int compareTo(OrderBean o) {

        int result;

 

        if (this.order_id > o.getOrder_id()) {

            result = 1;

        } else if (this.order_id < o.getOrder_id()) {

            result = -1;

        } else {

            result = this.price > o.getPrice() ? -1 : 1;

        }

        return result;

    }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(order_id);

        out.writeDouble(price);

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        in.readInt();

        in.readDouble();

    }

}

2編寫OrderSortMapper

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

OrderBean k = new OrderBean();

 

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

// 1 獲取一行

String line = value.toString();

 

// 2 截取

String[] fields = line.split("\t");

 

// 3 封裝對象

k.setOrder_id(Integer.parseInt(fields[0]));

k.setPrice(Double.parseDouble(fields[2]));

 

// 4 寫出

context.write(k, NullWritable.get());

}

}

3編寫OrderSortPartitioner

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Partitioner;

 

public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {

 

@Override

public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {

 

return (key.getOrder_id() & Integer.MAX_VALUE) % numReduceTasks;

}

}

4)編寫OrderSortGroupingComparator

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

 

public class OrderGroupingComparator extends WritableComparator {

 

protected OrderGroupingComparator() {

super(OrderBean.class, true);

}

 

@SuppressWarnings("rawtypes")

@Override

public int compare(WritableComparable a, WritableComparable b) {

 

OrderBean aBean = (OrderBean) a;

OrderBean bBean = (OrderBean) b;

 

int result;

if (aBean.getOrder_id() > bBean.getOrder_id()) {

result = 1;

} else if (aBean.getOrder_id() < bBean.getOrder_id()) {

result = -1;

} else {

result = 0;

}

 

return result;

}

}

5編寫OrderSortReducer

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

 

@Override

protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)

throws IOException, InterruptedException {

 

context.write(key, NullWritable.get());

}

}

6編寫OrderSortDriver

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OrderDriver {

 

public static void main(String[] args) throws Exception {

 

// 1 獲取配置信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

 

// 2 設置jar包加載路徑

job.setJarByClass(OrderDriver.class);

 

// 3 加載map/reduce

job.setMapperClass(OrderMapper.class);

job.setReducerClass(OrderReducer.class);

 

// 4 設置map輸出數據keyvalue類型

job.setMapOutputKeyClass(OrderBean.class);

job.setMapOutputValueClass(NullWritable.class);

 

// 5 設置最終輸出數據的keyvalue類型

job.setOutputKeyClass(OrderBean.class);

job.setOutputValueClass(NullWritable.class);

 

// 6 設置輸入數據和輸出數據路徑

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 10 設置reduce端的分組

job.setGroupingComparatorClass(OrderGroupingComparator.class);

 

// 7 設置分區

job.setPartitionerClass(OrderPartitioner.class);

 

// 8 設置reduce個數

job.setNumReduceTasks(3);

 

// 9 提交

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

7.4 小文件處理案例(自定義InputFormat

1需求

不管hdfs仍是mapreduce,對於小文件都有損效率,實踐中,又不免面臨處理大量小文件的場景,此時,就須要有相應解決方案。多個小文件合併成一個文件SequenceFileSequenceFile裏面存儲着多個文件,存儲的形式爲文件路徑+名稱爲key文件內容爲value

2輸入數據

          

最終預期文件格式:

 

3分析

小文件的優化無非如下幾種方式:

1)在數據採集的時候,就將小文件或小批數據合成大文件再上傳HDFS

2業務處理以前HDFS上使用mapreduce程序對小文件進行合併

3)在mapreduce處理時,可採用CombineTextInputFormat提升效率

4具體實現

本節採用自定義InputFormat方式,處理輸入小文件的問題。

1)自定義一個類繼承FileInputFormat

2)改寫RecordReader,實現一次讀取一個完整文件封裝爲KV

3)在輸出時使用SequenceFileOutPutFormat輸出合併文件

5)程序實現:

1)自定義InputFromat

package com.itstar.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

 

// 定義類繼承FileInputFormat

public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{

 

@Override

protected boolean isSplitable(JobContext context, Path filename) {

return false;

}

 

@Override

public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)

throws IOException, InterruptedException {

 

WholeRecordReader recordReader = new WholeRecordReader();

recordReader.initialize(split, context);

 

return recordReader;

}

}

2自定義RecordReader

package com.itstar.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable>{

 

private Configuration configuration;

private FileSplit split;

 

private boolean processed = false;

private BytesWritable value = new BytesWritable();

 

@Override

public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

 

this.split = (FileSplit)split;

configuration = context.getConfiguration();

}

 

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

 

if (!processed) {

// 1 定義緩存區

byte[] contents = new byte[(int)split.getLength()];

 

FileSystem fs = null;

FSDataInputStream fis = null;

 

try {

// 2 獲取文件系統

Path path = split.getPath();

fs = path.getFileSystem(configuration);

 

// 3 讀取數據

fis = fs.open(path);

 

// 4 讀取文件內容

IOUtils.readFully(fis, contents, 0, contents.length);

 

// 5 輸出文件內容

value.set(contents, 0, contents.length);

} catch (Exception e) {

 

}finally {

IOUtils.closeStream(fis);

}

 

processed = true;

 

return true;

}

 

return false;

}

 

@Override

public NullWritable getCurrentKey() throws IOException, InterruptedException {

return NullWritable.get();

}

 

@Override

public BytesWritable getCurrentValue() throws IOException, InterruptedException {

return value;

}

 

@Override

public float getProgress() throws IOException, InterruptedException {

return processed? 1:0;

}

 

@Override

public void close() throws IOException {

}

}

3SequenceFileMapper處理流程

package com.itstar.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{

 

Text k = new Text();

 

@Override

protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)

throws IOException, InterruptedException {

// 1 獲取文件切片信息

FileSplit inputSplit = (FileSplit) context.getInputSplit();

// 2 獲取切片名稱

String name = inputSplit.getPath().toString();

// 3 設置key輸出

k.set(name);

}

 

@Override

protected void map(NullWritable key, BytesWritable value,

Context context)

throws IOException, InterruptedException {

 

context.write(k, value);

}

}

(4)SequenceFileReducer處理流程

package com.itstar.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

 

@Override

protected void reduce(Text key, Iterable<BytesWritable> values, Context context)

throws IOException, InterruptedException {

 

context.write(key, values.iterator().next());

}

}

(5)SequenceFileDriver處理流程

package com.itstar.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

 

public class SequenceFileDriver {

 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

 

args = new String[] { "e:/input/inputinputformat", "e:/output1" };

Configuration conf = new Configuration();

 

Job job = Job.getInstance(conf);

job.setJarByClass(SequenceFileDriver.class);

job.setMapperClass(SequenceFileMapper.class);

job.setReducerClass(SequenceFileReducer.class);

 

        // 設置輸入的inputFormat

job.setInputFormatClass(WholeFileInputformat.class);

        // 設置輸出的outputFormat

job.setOutputFormatClass(SequenceFileOutputFormat.class);

 

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(BytesWritable.class);

 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(BytesWritable.class);

 

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

boolean result = job.waitForCompletion(true);

 

System.exit(result ? 0 : 1);

}

}

 

7.5 過濾日誌自定義日誌輸出路徑案例(自定義OutputFormat

1需求

過濾輸入log日誌是否包含itstar

1)包含itstar的網站輸出e:/itstar.log

2)不包含itstar的網站輸出到e:/other.log

2輸入數據

 

輸出預期:

  

3)具體程序:

(1)自定義一個outputformat

package com.itstar.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

 

@Override

public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)

throws IOException, InterruptedException {

 

// 建立一個RecordWriter

return new FilterRecordWriter(job);

}

}

2)具體的寫數據RecordWriter

package com.itstar.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

 

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {

FSDataOutputStream itstarOut = null;

FSDataOutputStream otherOut = null;

 

public FilterRecordWriter(TaskAttemptContext job) {

// 1 獲取文件系統

FileSystem fs;

 

try {

fs = FileSystem.get(job.getConfiguration());

 

// 2 建立輸出文件路徑

Path itstarPath = new Path("e:/itstar.log");

Path otherPath = new Path("e:/other.log");

 

// 3 建立輸出流

itstarOut = fs.create(itstarPath);

otherOut = fs.create(otherPath);

} catch (IOException e) {

e.printStackTrace();

}

}

 

@Override

public void write(Text key, NullWritable value) throws IOException, InterruptedException {

 

// 判斷是否包含「itstar」輸出到不一樣文件

if (key.toString().contains("itstar")) {

itstarOut.write(key.toString().getBytes());

} else {

otherOut.write(key.toString().getBytes());

}

}

 

@Override

public void close(TaskAttemptContext context) throws IOException, InterruptedException {

// 關閉資源

if (itstarOut != null) {

itstarOut.close();

}

 

if (otherOut != null) {

otherOut.close();

}

}

}

3)編寫FilterMapper

package com.itstar.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

 

Text k = new Text();

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// 1 獲取一行

String line = value.toString();

 

k.set(line);

 

// 3 寫出

context.write(k, NullWritable.get());

}

}

(4)編寫FilterReducer

package com.itstar.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

 

@Override

protected void reduce(Text key, Iterable<NullWritable> values, Context context)

throws IOException, InterruptedException {

 

String k = key.toString();

k = k + "\r\n";

 

context.write(new Text(k), NullWritable.get());

}

}

(5)編寫FilterDriver

package com.itstar.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FilterDriver {

public static void main(String[] args) throws Exception {

 

args = new String[] { "e:/input/inputoutputformat", "e:/output2" };

 

Configuration conf = new Configuration();

 

Job job = Job.getInstance(conf);

 

job.setJarByClass(FilterDriver.class);

job.setMapperClass(FilterMapper.class);

job.setReducerClass(FilterReducer.class);

 

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(NullWritable.class);

 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

 

// 要將自定義的輸出格式組件設置到job

job.setOutputFormatClass(FilterOutputFormat.class);

 

FileInputFormat.setInputPaths(job, new Path(args[0]));

 

// 雖然咱們自定義了outputformat,可是由於咱們的outputformat繼承自fileoutputformat

// fileoutputformat要輸出一個_SUCCESS文件,因此,在這還得指定一個輸出目錄

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

 

7.6 MapReduce中多表合併案例

1)需求:

訂單數據表t_order

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

 

商品信息表t_product

pid

pname

01

小米

02

華爲

03

格力

 

商品信息表中數據根據商品pid合併到訂單數據表中。

最終數據形式:

id

pname

amount

1001

小米

1

1004

小米

4

1002

華爲

2

1005

華爲

5

1003

格力

3

1006

格力

6

7.6.1 需求1Reduce端表合併(數據傾斜

經過將關聯條件做爲map輸出的key,將兩表知足join條件的數據並攜帶數據所來源的文件信息,發往同一個reduce task,在reduce中進行數據的串聯。

 

1)建立商品和訂合併後的bean

package com.itstar.mapreduce.table;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

 

public class TableBean implements Writable {

private String order_id; // 訂單id

private String p_id; // 產品id

private int amount; // 產品數量

private String pname; // 產品名稱

private String flag;// 表的標記

 

public TableBean() {

super();

}

 

public TableBean(String order_id, String p_id, int amount, String pname, String flag) {

super();

this.order_id = order_id;

this.p_id = p_id;

this.amount = amount;

this.pname = pname;

this.flag = flag;

}

 

public String getFlag() {

return flag;

}

 

public void setFlag(String flag) {

this.flag = flag;

}

 

public String getOrder_id() {

return order_id;

}

 

public void setOrder_id(String order_id) {

this.order_id = order_id;

}

 

public String getP_id() {

return p_id;

}

 

public void setP_id(String p_id) {

this.p_id = p_id;

}

 

public int getAmount() {

return amount;

}

 

public void setAmount(int amount) {

this.amount = amount;

}

 

public String getPname() {

return pname;

}

 

public void setPname(String pname) {

this.pname = pname;

}

 

@Override

public void write(DataOutput out) throws IOException {

out.writeUTF(order_id);

out.writeUTF(p_id);

out.writeInt(amount);

out.writeUTF(pname);

out.writeUTF(flag);

}

 

@Override

public void readFields(DataInput in) throws IOException {

this.order_id = in.readUTF();

this.p_id = in.readUTF();

this.amount = in.readInt();

this.pname = in.readUTF();

this.flag = in.readUTF();

}

 

@Override

public String toString() {

return order_id + "\t" + pname + "\t" + amount + "\t" ;

}

}

2)編寫TableMapper程序

package com.itstar.mapreduce.table;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{

TableBean bean = new TableBean();

Text k = new Text();

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

 

// 1 獲取輸入文件類型

FileSplit split = (FileSplit) context.getInputSplit();

String name = split.getPath().getName();

 

// 2 獲取輸入數據

String line = value.toString();

 

// 3 不一樣文件分別處理

if (name.startsWith("order")) {// 訂單表處理

// 3.1 切割

String[] fields = line.split("\t");

 

// 3.2 封裝bean對象

bean.setOrder_id(fields[0]);

bean.setP_id(fields[1]);

bean.setAmount(Integer.parseInt(fields[2]));

bean.setPname("");

bean.setFlag("0");

 

k.set(fields[1]);

}else {// 產品表處理

// 3.3 切割

String[] fields = line.split("\t");

 

// 3.4 封裝bean對象

bean.setP_id(fields[0]);

bean.setPname(fields[1]);

bean.setFlag("1");

bean.setAmount(0);

bean.setOrder_id("");

 

k.set(fields[0]);

}

// 4 寫出

context.write(k, bean);

}

}

3)編寫TableReducer程序

package com.itstar.mapreduce.table;

import java.io.IOException;

import java.util.ArrayList;

import org.apache.commons.beanutils.BeanUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

 

@Override

protected void reduce(Text key, Iterable<TableBean> values, Context context)

throws IOException, InterruptedException {

 

// 1準備存儲訂單的集合

ArrayList<TableBean> orderBeans = new ArrayList<>();

// 2 準備bean對象

TableBean pdBean = new TableBean();

 

for (TableBean bean : values) {

 

if ("0".equals(bean.getFlag())) {// 訂單表

// 拷貝傳遞過來的每條訂單數據到集合中

TableBean orderBean = new TableBean();

try {

BeanUtils.copyProperties(orderBean, bean);

} catch (Exception e) {

e.printStackTrace();

}

 

orderBeans.add(orderBean);

} else {// 產品表

try {

// 拷貝傳遞過來的產品表到內存中

BeanUtils.copyProperties(pdBean, bean);

} catch (Exception e) {

e.printStackTrace();

}

}

}

 

// 3 表的拼接

for(TableBean bean:orderBeans){

bean.setPname (pdBean.getPname());

 

// 4 數據寫出去

context.write(bean, NullWritable.get());

}

}

}

4)編寫TableDriver程序

package com.itstar.mapreduce.table;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TableDriver {

 

public static void main(String[] args) throws Exception {

// 1 獲取配置信息,或者job對象實例

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 2 指定本程序的jar包所在的本地路徑

job.setJarByClass(TableDriver.class);

 

// 3 指定本業務job要使用的mapper/Reducer業務類

job.setMapperClass(TableMapper.class);

job.setReducerClass(TableReducer.class);

 

// 4 指定mapper輸出數據的kv類型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(TableBean.class);

 

// 5 指定最終輸出的數據的kv類型

job.setOutputKeyClass(TableBean.class);

job.setOutputValueClass(NullWritable.class);

 

// 6 指定job的輸入原始文件所在目錄

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

3)運行程序查看結果

1001 小米 1

1001 小米 1

1002 華爲 2

1002 華爲 2

1003 格力 3

1003 格力 3

缺點:這種方式中,合併的操做是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜

解決方案: map端實現數據合併

7.6.2 需求2Map端表合併(Distributedcache

1)分析

適用於關聯表中有小表的情形;

能夠將小表分發到全部的map節點,這樣,map節點就能夠在本地對本身所讀到的大表數據進行合併並輸出最終結果,能夠大大提升合併操做的併發度,加快處理速度。

 

2)實操案例

1)先在驅動模塊中添加緩存文件

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class DistributedCacheDriver {

 

public static void main(String[] args) throws Exception {

// 1 獲取job信息

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 2 設置加載jar包路徑

job.setJarByClass(DistributedCacheDriver.class);

 

// 3 關聯map

job.setMapperClass(DistributedCacheMapper.class);

 

// 4 設置最終輸出數據類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

 

// 5 設置輸入輸出路徑

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 6 加載緩存數據

job.addCacheFile(new URI("file:///e:/inputcache/pd.txt"));

 

// 7 mapjoin的邏輯不須要reduce階段,設置reducetask數量爲0

job.setNumReduceTasks(0);

 

// 8 提交

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}

2)讀取緩存文件數據

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.HashMap;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

 

Map<String, String> pdMap = new HashMap<>();

 

@Override

protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)

throws IOException, InterruptedException {

 

// 1 獲取緩存的文件

BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"),"UTF-8"));

 

String line;

while(StringUtils.isNotEmpty(line = reader.readLine())){

// 2 切割

String[] fields = line.split("\t");

 

// 3 緩存數據到集合

pdMap.put(fields[0], fields[1]);

}

 

// 4 關流

reader.close();

}

 

Text k = new Text();

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// 1 獲取一行

String line = value.toString();

 

// 2 截取

String[] fields = line.split("\t");

 

// 3 獲取產品id

String pId = fields[1];

 

// 4 獲取商品名稱

String pdName = pdMap.get(pId);

 

// 5 拼接

k.set(line + "\t"+ pdName);

 

// 6 寫出

context.write(k, NullWritable.get());

}

}

7.7 日誌清洗案例

7.7.1 簡單解析版

1)需求:

去除日誌中字段長度小於等於11的日誌。

2輸入數據

 

3)實現代碼:

(1)編寫LogMapper

package com.itstar.mapreduce.weblog;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

 

Text k = new Text();

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

 

// 1 獲取1行數據

String line = value.toString();

 

// 2 解析日誌

boolean result = parseLog(line,context);

 

// 3 日誌不合法退出

if (!result) {

return;

}

 

// 4 設置key

k.set(line);

 

// 5 寫出數據

context.write(k, NullWritable.get());

}

 

// 2 解析日誌

private boolean parseLog(String line, Context context) {

// 1 截取

String[] fields = line.split(" ");

 

// 2 日誌長度大於11的爲合法

if (fields.length > 11) {

// 系統計數器

context.getCounter("map", "true").increment(1);

return true;

}else {

context.getCounter("map", "false").increment(1);

return false;

}

}

}

(2)編寫LogDriver

package com.itstar.mapreduce.weblog;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class LogDriver {

 

public static void main(String[] args) throws Exception {

 

        args = new String[] { "e:/input/inputlog", "e:/output1" };

 

// 1 獲取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

 

// 2 加載jar

job.setJarByClass(LogDriver.class);

 

// 3 關聯map

job.setMapperClass(LogMapper.class);

 

// 4 設置最終輸出類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

 

// 設置reducetask個數爲0

job.setNumReduceTasks(0);

 

// 5 設置輸入和輸出路徑

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 6 提交

job.waitForCompletion(true);

}

}

7.7.2 複雜解析版

1)需求:

web訪問日誌中的各字段識別切分

去除日誌中不合法的記錄

根據統計需求,生成各種訪問請求過濾數據

2輸入數據

 

3)實現代碼:

1)定義一個bean,用來記錄日誌數據中的各數據字段

package com.itstar.mapreduce.log;

 

public class LogBean {

private String remote_addr;// 記錄客戶端的ip地址

private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"

private String time_local;// 記錄訪問時間與時區

private String request;// 記錄請求的urlhttp協議

private String status;// 記錄請求狀態;成功是200

private String body_bytes_sent;// 記錄發送給客戶端文件主體內容大小

private String http_referer;// 用來記錄從那個頁面連接訪問過來的

private String http_user_agent;// 記錄客戶瀏覽器的相關信息

 

private boolean valid = true;// 判斷數據是否合法

 

public String getRemote_addr() {

return remote_addr;

}

 

public void setRemote_addr(String remote_addr) {

this.remote_addr = remote_addr;

}

 

public String getRemote_user() {

return remote_user;

}

 

public void setRemote_user(String remote_user) {

this.remote_user = remote_user;

}

 

public String getTime_local() {

return time_local;

}

 

public void setTime_local(String time_local) {

this.time_local = time_local;

}

 

public String getRequest() {

return request;

}

 

public void setRequest(String request) {

this.request = request;

}

 

public String getStatus() {

return status;

}

 

public void setStatus(String status) {

this.status = status;

}

 

public String getBody_bytes_sent() {

return body_bytes_sent;

}

 

public void setBody_bytes_sent(String body_bytes_sent) {

this.body_bytes_sent = body_bytes_sent;

}

 

public String getHttp_referer() {

return http_referer;

}

 

public void setHttp_referer(String http_referer) {

this.http_referer = http_referer;

}

 

public String getHttp_user_agent() {

return http_user_agent;

}

 

public void setHttp_user_agent(String http_user_agent) {

this.http_user_agent = http_user_agent;

}

 

public boolean isValid() {

return valid;

}

 

public void setValid(boolean valid) {

this.valid = valid;

}

 

@Override

public String toString() {

StringBuilder sb = new StringBuilder();

sb.append(this.valid);

sb.append("\001").append(this.remote_addr);

sb.append("\001").append(this.remote_user);

sb.append("\001").append(this.time_local);

sb.append("\001").append(this.request);

sb.append("\001").append(this.status);

sb.append("\001").append(this.body_bytes_sent);

sb.append("\001").append(this.http_referer);

sb.append("\001").append(this.http_user_agent);

 

return sb.toString();

}

}

2)編寫LogMapper程序

package com.itstar.mapreduce.log;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

Text k = new Text();

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// 1 獲取1

String line = value.toString();

 

// 2 解析日誌是否合法

LogBean bean = pressLog(line);

 

if (!bean.isValid()) {

return;

}

 

k.set(bean.toString());

 

// 3 輸出

context.write(k, NullWritable.get());

}

 

// 解析日誌

private LogBean pressLog(String line) {

LogBean logBean = new LogBean();

 

// 1 截取

String[] fields = line.split(" ");

 

if (fields.length > 11) {

// 2封裝數據

logBean.setRemote_addr(fields[0]);

logBean.setRemote_user(fields[1]);

logBean.setTime_local(fields[3].substring(1));

logBean.setRequest(fields[6]);

logBean.setStatus(fields[8]);

logBean.setBody_bytes_sent(fields[9]);

logBean.setHttp_referer(fields[10]);

 

if (fields.length > 12) {

logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);

}else {

logBean.setHttp_user_agent(fields[11]);

}

 

// 大於400HTTP錯誤

if (Integer.parseInt(logBean.getStatus()) >= 400) {

logBean.setValid(false);

}

}else {

logBean.setValid(false);

}

 

return logBean;

}

}

(3)編寫LogDriver程序

package com.itstar.mapreduce.log;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class LogDriver {

public static void main(String[] args) throws Exception {

// 1 獲取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

 

// 2 加載jar

job.setJarByClass(LogDriver.class);

 

// 3 關聯map

job.setMapperClass(LogMapper.class);

 

// 4 設置最終輸出類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

 

// 5 設置輸入和輸出路徑

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 6 提交

job.waitForCompletion(true);

}

}

7.8 倒排索引案例(多job串聯

0)需求:有大量的文本(文檔、網頁),須要創建搜索索引

    

 

1)第一次預期輸出結果

itstar--a.txt 3

itstar--b.txt 2

itstar--c.txt 2

pingping--a.txt  1

pingping--b.txt 3

pingping--c.txt  1

ss--a.txt 2

ss--b.txt 1

ss--c.txt 1

2第二次預期輸出結果

itstar c.txt-->2 b.txt-->2 a.txt-->3

pingping c.txt-->1 b.txt-->3 a.txt-->1

ss c.txt-->1 b.txt-->1 a.txt-->2

1)第一次處理

1)第一次處理,編寫OneIndexMapper

package com.itstar.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class OneIndexMapper extends Mapper<LongWritable, Text, Text , IntWritable>{

 

String name;

Text k = new Text();

IntWritable v = new IntWritable();

 

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

// 獲取文件名稱

FileSplit split = (FileSplit) context.getInputSplit();

 

name = split.getPath().getName();

}

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// 1 獲取1

String line = value.toString();

 

// 2 切割

String[] fields = line.split(" ");

 

for (String word : fields) {

// 3 拼接

k.set(word+"--"+name);

v.set(1);

 

// 4 寫出

context.write(k, v);

}

}

}

(2)第一次處理,編寫OneIndexReducer

package com.itstar.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

 

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

 

int count = 0;

// 1 累加求和

for(IntWritable value: values){

count +=value.get();

}

 

// 2 寫出

context.write(key, new IntWritable(count));

}

}

(3)第一次處理,編寫OneIndexDriver

package com.itstar.mapreduce.index;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OneIndexDriver {

 

public static void main(String[] args) throws Exception {

 

args = new String[] { "e:/input/inputoneindex", "e:/output5" };

 

Configuration conf = new Configuration();

 

Job job = Job.getInstance(conf);

job.setJarByClass(OneIndexDriver.class);

 

job.setMapperClass(OneIndexMapper.class);

job.setReducerClass(OneIndexReducer.class);

 

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

 

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

job.waitForCompletion(true);

}

}

4)查看第一次輸出結果

itstar--a.txt 3

itstar--b.txt 2

itstar--c.txt 2

pingping--a.txt 1

pingping--b.txt 3

pingping--c.txt 1

ss--a.txt 2

ss--b.txt 1

ss--c.txt 1

2)第二次處理

1)第二次處理,編寫TwoIndexMapper

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text>{

Text k = new Text();

Text v = new Text();

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

 

// 1 獲取1行數據

String line = value.toString();

 

// 2「--」切割

String[] fields = line.split("--");

 

k.set(fields[0]);

v.set(fields[1]);

 

// 3 輸出數據

context.write(k, v);

}

}

(2)第二次處理,編寫TwoIndexReducer

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class TwoIndexReducer extends Reducer<Text, Text, Text, Text> {

 

@Override

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

// itstar a.txt 3

// itstar b.txt 2

// itstar c.txt 2

 

// itstar c.txt-->2 b.txt-->2 a.txt-->3

 

StringBuilder sb = new StringBuilder();

        // 1 拼接

for (Text value : values) {

sb.append(value.toString().replace("\t", "-->") + "\t");

}

// 2 寫出

context.write(key, new Text(sb.toString()));

}

}

(3)第二次處理,編寫TwoIndexDriver

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TwoIndexDriver {

 

public static void main(String[] args) throws Exception {

 

args = new String[] { "e:/input/inputtwoindex", "e:/output6" };

 

Configuration config = new Configuration();

Job job = Job.getInstance(config);

 

job.setJarByClass(TwoIndexDriver.class);

job.setMapperClass(TwoIndexMapper.class);

job.setReducerClass(TwoIndexReducer.class);

 

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

boolean result = job.waitForCompletion(true);

System.exit(result?0:1);

}

}

4第二次查看最終結果

Itstar  c.txt-->2 b.txt-->2 a.txt-->3

pingping c.txt-->1 b.txt-->3 a.txt-->1

ss c.txt-->1 b.txt-->1 a.txt-->2

7.9 找博客共同好友案例

1需求:

如下是博客的好友列表數據,冒號前是一個用戶,冒號後是該用戶的全部好友(數據中的好友關係是單向的)

 

求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?

2)需求分析:

求出AB、C….等是好友

第一次輸出結果

A I,K,C,B,G,F,H,O,D,

B A,F,J,E,

C A,E,B,H,F,G,K,

D G,C,K,A,L,F,E,H,

E G,M,L,H,A,F,B,D,

F L,M,D,C,G,A,

G M,

H O,

I O,C,

J O,

K B,

L D,E,

M E,F,

O A,H,I,J,F,

第二次輸出結果

A-B E C

A-C D F

A-D E F

A-E D B C

A-F O B C D E

A-G F E C D

….

3)代碼實現:

1)第一次Mapper

package com.itstar.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

 

@Override

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)

throws IOException, InterruptedException {

// 1 獲取一行 A:B,C,D,F,E,O

String line = value.toString();

 

// 2 切割

String[] fields = line.split(":");

 

// 3 獲取person和好友

String person = fields[0];

String[] friends = fields[1].split(",");

 

// 4寫出去

for(String friend: friends){

// 輸出 <好友,人>

context.write(new Text(friend), new Text(person));

}

}

}

(2)第一次Reducer

package com.itstar.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

 

@Override

protected void reduce(Text key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

 

StringBuffer sb = new StringBuffer();

//1 拼接

for(Text person: values){

sb.append(person).append(",");

}

 

//2 寫出

context.write(key, new Text(sb.toString()));

}

}

(3)第一次Driver

package com.itstar.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OneShareFriendsDriver {

 

public static void main(String[] args) throws Exception {

// 1 獲取job對象

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 2 指定jar包運行的路徑

job.setJarByClass(OneShareFriendsDriver.class);

 

// 3 指定map/reduce使用的類

job.setMapperClass(OneShareFriendsMapper.class);

job.setReducerClass(OneShareFriendsReducer.class);

 

// 4 指定map輸出的數據類型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

 

// 5 指定最終輸出的數據類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 

// 6 指定job的輸入原始所在目錄

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 7 提交

boolean result = job.waitForCompletion(true);

 

System.exit(result?0:1);

}

}

(4)第二次Mapper

package com.itstar.mapreduce.friends;

import java.io.IOException;

import java.util.Arrays;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// A I,K,C,B,G,F,H,O,D,

// 友 人,人,人

String line = value.toString();

String[] friend_persons = line.split("\t");

 

String friend = friend_persons[0];

String[] persons = friend_persons[1].split(",");

 

Arrays.sort(persons);

 

for (int i = 0; i < persons.length - 1; i++) {

 

for (int j = i + 1; j < persons.length; j++) {

// 發出 <-人,好友> ,這樣,相同的-對的全部好友就會到同1reduce中去

context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));

}

}

}

}

(5)第二次Reducer

package com.itstar.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

 

@Override

protected void reduce(Text key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

 

StringBuffer sb = new StringBuffer();

 

for (Text friend : values) {

sb.append(friend).append(" ");

}

 

context.write(key, new Text(sb.toString()));

}

}

(6)第二次Driver

package com.itstar.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TwoShareFriendsDriver {

 

public static void main(String[] args) throws Exception {

// 1 獲取job對象

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 2 指定jar包運行的路徑

job.setJarByClass(TwoShareFriendsDriver.class);

 

// 3 指定map/reduce使用的類

job.setMapperClass(TwoShareFriendsMapper.class);

job.setReducerClass(TwoShareFriendsReducer.class);

 

// 4 指定map輸出的數據類型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

 

// 5 指定最終輸出的數據類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 

// 6 指定job的輸入原始所在目錄

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 7 提交

boolean result = job.waitForCompletion(true);

System.exit(result?0:1);

}

}

7.10 壓縮/解壓縮案例

7.10.1 數據流的壓縮和解壓縮

CompressionCodec有兩個方法能夠用於輕鬆地壓縮或解壓縮數據。要想對正在被寫入一個輸出流的數據進行壓縮,咱們可使用createOutputStream(OutputStreamout)方法建立一個CompressionOutputStream,將其以壓縮格式寫入底層的流。相反,要想對從輸入流讀取而來的數據進行解壓縮,則調用createInputStream(InputStreamin)函數,從而得到一個CompressionInputStream從而從底層的流讀取未壓縮的數據。

測試一下以下壓縮方式

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

 

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.io.compress.CompressionCodecFactory;

import org.apache.hadoop.io.compress.CompressionInputStream;

import org.apache.hadoop.io.compress.CompressionOutputStream;

import org.apache.hadoop.util.ReflectionUtils;

 

public class TestCompress {

 

public static void main(String[] args) throws Exception {

compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");

// decompress("e:/hello.txt.bz2");

}

 

  /**

     * 壓縮方法

     *

     * @param fliename 文件路徑+文件名

     * @param method 解碼器

     */

    private static void compress(String filename, String method) throws Exception {

        //建立輸入流

        FileInputStream fis = new FileInputStream(new File(filename));

 

        //經過反射找到解碼器的類

        Class codeClass = Class.forName(method);

 

        //經過反射工具類找到解碼器對象,須要用到配置conf對象

        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass, new Configuration());

 

        //建立輸出流

        FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));

 

        //得到解碼器的輸出對象

        CompressionOutputStream cos = codec.createOutputStream(fos);

 

        //流拷貝

        IOUtils.copyBytes(fis,cos,5 * 1024 * 1024,false);

 

        //關閉流

        cos.close();

        fos.close();

        fis.close();

}

 

    /**

     * 解開壓縮

     *

     * @param 文件路徑+文件名

     * @param 後綴

     */

    private static void decompress(String filename, String decoded) throws Exception {

        //獲取factory實例

        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());

 

        CompressionCodec codec = factory.getCodec(new Path(filename));

 

        if (codec == null) {

            System.out.println(filename);

            return;

        }

 

        //解壓縮的輸入

        CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));

 

        //輸出流

        FileOutputStream fos = new FileOutputStream(new File(filename + "." + decoded));

 

        //流拷貝

        IOUtils.copyBytes(cis, fos, 5 * 1024 * 1024, false);

 

        cis.close();

        fos.close();

    }

7.10.2 Map輸出端採用壓縮

即便你的MapReduce的輸入輸出文件都是未壓縮的文件,你仍然能夠對map任務的中間結果輸出作壓縮,由於它要寫在硬盤而且經過網絡傳輸到reduce節點,對其壓縮能夠提升不少性能,這些工做只要設置兩個屬性便可,咱們來看下代碼怎麼設置:

1)給你們提供的hadoop源碼支持的壓縮格式有:BZip2Codec 、DefaultCodec

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.compress.BZip2Codec;

import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.io.compress.GzipCodec;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class WordCountDriver {

 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

 

Configuration configuration = new Configuration();

 

// 開啓map端輸出壓縮

configuration.setBoolean("mapreduce.map.output.compress", true);

// 設置map端輸出壓縮方式

configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

 

Job job = Job.getInstance(configuration);

 

job.setJarByClass(WordCountDriver.class);

 

job.setMapperClass(WordCountMapper.class);

job.setReducerClass(WordCountReducer.class);

 

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

 

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

boolean result = job.waitForCompletion(true);

 

System.exit(result ? 1 : 0);

}

}

2Mapper保持不變

package com.itstar.mapreduce.compress;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

 

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

// 1 獲取一行

String line = value.toString();

// 2 切割

String[] words = line.split(" ");

// 3 循環寫出

for(String word:words){

context.write(new Text(word), new IntWritable(1));

}

}

}

3)Reducer保持不變

package com.itstar.mapreduce.compress;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

 

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

 

int count = 0;

// 1 彙總

for(IntWritable value:values){

count += value.get();

}

 

        // 2 輸出

context.write(key, new IntWritable(count));

}

}

7.10.3 Reduce輸出端採用壓縮

基於workcount案例處理

1修改驅動

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.compress.BZip2Codec;

import org.apache.hadoop.io.compress.DefaultCodec;

import org.apache.hadoop.io.compress.GzipCodec;

import org.apache.hadoop.io.compress.Lz4Codec;

import org.apache.hadoop.io.compress.SnappyCodec;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class WordCountDriver {

 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

 

Configuration configuration = new Configuration();

 

Job job = Job.getInstance(configuration);

 

job.setJarByClass(WordCountDriver.class);

 

job.setMapperClass(WordCountMapper.class);

job.setReducerClass(WordCountReducer.class);

 

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

 

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 設置reduce端輸出壓縮開啓

FileOutputFormat.setCompressOutput(job, true);

 

// 設置壓縮的方式

    FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);

//     FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

//     FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

    

boolean result = job.waitForCompletion(true);

 

System.exit(result?1:0);

}

}

3) MapperReducer保持不變

 

7.11 兩表Join

未優化版本

Bean.java

import org.apache.hadoop.io.WritableComparable;

 

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

 

/*

 * 人員和地址的通用bean

 */

public class Bean implements WritableComparable<Bean> {

    private String userNo = "";

    private String userName = "";

    private String addreNo = "";

    private String addreName = "";

    private int flag;

 

    public Bean(Bean bean) {

        this.userName = bean.getUserName();

        this.userNo = bean.getUserNo();

        this.addreName = bean.getAddreName();

        this.addreNo = bean.getAddreNo();

        this.flag = bean.getFlag();

    }

 

    public Bean() {

        super();

        // TODO Auto-generated constructor stub

    }

 

    public Bean(String userNo, String userName, String addreNo,

                String addreName, int flag) {

        super();

        this.userNo = userNo;

        this.userName = userName;

        this.addreNo = addreNo;

        this.addreName = addreName;

        this.flag = flag;

    }

 

    public String getUserNo() {

        return userNo;

    }

 

    public void setUserNo(String userNo) {

        this.userNo = userNo;

    }

 

    public String getUserName() {

        return userName;

    }

 

    public void setUserName(String userName) {

        this.userName = userName;

    }

 

    public String getAddreNo() {

        return addreNo;

    }

 

    public void setAddreNo(String addreNo) {

        this.addreNo = addreNo;

    }

 

    public String getAddreName() {

        return addreName;

    }

 

    public void setAddreName(String addreName) {

        this.addreName = addreName;

    }

 

    public int getFlag() {

        return flag;

    }

 

    public void setFlag(int flag) {

        this.flag = flag;

    }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeUTF(userNo);

        out.writeUTF(userName);

        out.writeUTF(addreNo);

        out.writeUTF(addreName);

        out.writeInt(flag);

 

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        this.userNo = in.readUTF();

        this.userName = in.readUTF();

        this.addreNo = in.readUTF();

        this.addreName = in.readUTF();

        this.flag = in.readInt();

 

    }

 

    @Override

    public int compareTo(Bean arg0) {

        // TODO Auto-generated method stub

        return 0;

    }

 

    @Override

    public String toString() {

        return "userNo=" + userNo + ", userName=" + userName + ", addreNo="

                + addreNo + ", addreName=" + addreName;

    }

 

}

PersonAddrMap.java

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

import java.io.IOException;

 

public class PersonAddrMap extends Mapper<LongWritable, Text, IntWritable, Bean> {

    @Override

    protected void map(LongWritable key, Text value,

                       Mapper<LongWritable, Text, IntWritable, Bean>.Context context)

            throws IOException, InterruptedException {

        String line = value.toString();

        String str[] = line.split(" ");

        if (str.length == 2) { //地區信息表

            Bean bean = new Bean();

            bean.setAddreNo(str[0]);

            bean.setAddreName(str[1]);

            bean.setFlag(0); // 0表示地區

            context.write(new IntWritable(Integer.parseInt(str[0])), bean);

        } else { //人員信息表

            Bean bean = new Bean();

            bean.setUserNo(str[0]);

            bean.setUserName(str[1]);

            bean.setAddreNo(str[2]);

            bean.setFlag(1); // 1表示人員表

            context.write(new IntWritable(Integer.parseInt(str[2])), bean);

        }

    }

}

 

PersonAddreRedu.java

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

 

public class PersonAddreRedu extends Reducer<IntWritable, Bean, NullWritable,Text> {

    @Override

    protected void reduce(IntWritable key, Iterable<Bean> values,

                          Reducer<IntWritable, Bean, NullWritable, Text>.Context context)

            throws IOException, InterruptedException {

        Bean Addre = null;

        List<Bean> peoples = new ArrayList<Bean>();

/*

 * 若是values的第一個元素信息就是地址Addre的信息的話,

 * 咱們就再也不須要一個List來緩存person信息了,values後面的全是人員信息

 * 將減小巨大的內存空間

 */

/*

 * partitioner和shuffer的過程:

 * partitioner的主要功能是根據reduce的數量將map輸出的結果進行分塊,將數據送入到相應的reducer.

 * 全部的partitioner都必須實現partitioner接口並實現getPartition方法,該方法的返回值爲int類型,而且取值範圍在0~(numOfReducer-1),

 * 從而能將map的輸出輸入到對應的reducer中,對於某個mapreduce過程,hadoop框架定義了默認的partitioner爲HashPartioner,

 * 該partitioner使用key的hashCode來決定將該key輸送到哪一個reducer;

 * shuffle將每一個partitioner輸出的結果根據key進行group以及排序,將具備相同key的value構成一個values的迭代器,並根據key進行排序分別調用

 * 開發者定義的reduce方法進行排序,所以mapreducer的因此key必須實現comparable接口的compareto()方法從而能實現兩個key對象的比較

 */

/*

 * 咱們須要自定義key的數據結構(shuffle按照key進行分組)來知足共同addreNo的狀況下地址表的更小需求

 *

 */

        for (Bean bean : values) {

            if (bean.getFlag() == 0) { // 表示地區表

                Addre = new Bean(bean);

            } else {

                peoples.add(new Bean(bean)); //添加到peoplelist中

            }

        }

        for (Bean peo : peoples) { // 給peoplelist添加地區名字

            peo.setAddreName(Addre.getAddreName());

            context.write(NullWritable.get(), new Text(peo.toString()));

        }

    }

}

 

PersonAddreMain.java

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class PersonAddreMain {

    public static void main(String[] args) throws Exception {

 

        args = new String[] { "F:\\A\\join\\", "F:\\A\\out" };

 

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(PersonAddreMain.class);

 

        job.setMapperClass(PersonAddrMap.class);

        job.setMapOutputKeyClass(IntWritable.class);

        job.setMapOutputValueClass(Bean.class);

 

        job.setReducerClass(PersonAddreRedu.class);

        job.setOutputKeyClass(NullWritable.class);

        job.setOutputValueClass(Text.class);

 

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

    }

}

 

已優化版本

Bean.java

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

 

/*

 * 人員和地址的通用bean

 * 用做map輸出的value

 */

public class Bean implements WritableComparable<Bean> {

    private String userNo = " ";

    private String userName = " ";

    private String addreNo = " ";

    private String addreName = " ";

 

    public Bean(Bean bean) {

        this.userName = bean.getUserName();

        this.userNo = bean.getUserNo();

        this.addreName = bean.getAddreName();

        this.addreNo = bean.getAddreNo();

    }

 

    public Bean() {

        super();

        // TODO Auto-generated constructor stub

    }

 

    public Bean(String userNo, String userName, String addreNo,

                String addreName, int flag) {

        super();

        this.userNo = userNo;

        this.userName = userName;

        this.addreNo = addreNo;

        this.addreName = addreName;

    }

 

 

    public String getUserNo() {

        return userNo;

    }

 

    public void setUserNo(String userNo) {

        this.userNo = userNo;

    }

 

    public String getUserName() {

        return userName;

    }

 

    public void setUserName(String userName) {

        this.userName = userName;

    }

 

    public String getAddreNo() {

        return addreNo;

    }

 

    public void setAddreNo(String addreNo) {

        this.addreNo = addreNo;

    }

 

    public String getAddreName() {

        return addreName;

    }

 

    public void setAddreName(String addreName) {

        this.addreName = addreName;

    }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeUTF(userNo);

        out.writeUTF(userName);

        out.writeUTF(addreNo);

        out.writeUTF(addreName);

 

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        this.userNo = in.readUTF();

        this.userName = in.readUTF();

        this.addreNo = in.readUTF();

        this.addreName = in.readUTF();

    }

 

    @Override

    public int compareTo(Bean arg0) {

        // TODO Auto-generated method stub

        return 0;

    }

 

    @Override

    public String toString() {

        return "userNo=" + userNo + ", userName=" + userName + ", addreNo="

                + addreNo + ", addreName=" + addreName;

    }

}

BeanKey.java

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

 

/*

 * map輸出的key

 */

public class BeanKey implements WritableComparable<BeanKey> {

    private int AddreNo;

    private boolean isPrimary; // true:address false:person

 

    public BeanKey(int addreNo, boolean isPrimary) {

        super();

        this.AddreNo = addreNo;

        this.isPrimary = isPrimary;

    }

 

    public BeanKey() {

        super();

        // TODO Auto-generated constructor stub

    }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(AddreNo);

        out.writeBoolean(isPrimary);

 

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        this.AddreNo = in.readInt();

        this.isPrimary = in.readBoolean();

 

    }

 

    // partitioner執行時調用hashcode()方法和compareTo()方法

    // compareTo()方法做爲shuffle排序的默認方法

    @Override

    public int hashCode() {

        return this.AddreNo; // 按AddreNo進行分組

    }

 

    //用於排序,將相同的AddressNo的地址表和人員表,將地址表放到首位

    @Override

    public int compareTo(BeanKey o) {

        if (this.AddreNo == o.getAddreNo()) { // 若是是同一個AddressNo的數據則判斷是Person仍是Address表

            if (this.isPrimary == o.isPrimary()) {  //若是屬性相同屬於同種類型的表,返回0

                return 0;

            } else {

                return this.isPrimary ? -1 : 1; // true表示Address表 返回更小的值,將排至values隊首

            }

        } else {

            return this.AddreNo - o.getAddreNo() > 0 ? 1 : -1;  //按AddressNo排序

        }

    }

 

    public int getAddreNo() {

        return AddreNo;

    }

 

    public void setAddreNo(int addreNo) {

        AddreNo = addreNo;

    }

 

    public boolean isPrimary() {

        return isPrimary;

    }

 

    public void setPrimary(boolean isPrimary) {

        this.isPrimary = isPrimary;

    }

}

PersonAddrMap.java

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

 

/*

 * map類使key,value分別進行處理

 */

public class PersonAddreMap extends Mapper<LongWritable, Text, BeanKey, Bean> {

    @Override

    protected void map(LongWritable key, Text value,

                       Mapper<LongWritable, Text, BeanKey, Bean>.Context context)

            throws IOException, InterruptedException {

        String line = value.toString();

        String str[] = line.split(" ");

        if (str.length == 2) {

            // Addre表

            Bean Addre = new Bean();

            Addre.setAddreNo(str[0]);

            Addre.setAddreName(str[1]);

 

            BeanKey AddreKey = new BeanKey();

            AddreKey.setAddreNo(Integer.parseInt(str[0]));

            AddreKey.setPrimary(true); // true表示地區表

            context.write(AddreKey, Addre);

        } else {

            // Person表

            Bean Person = new Bean();

            Person.setUserNo(str[0]);

            Person.setUserName(str[1]);

            Person.setAddreNo(str[2]);

 

            BeanKey PerKey = new BeanKey();

            PerKey.setAddreNo(Integer.parseInt(str[2]));

            PerKey.setPrimary(false);// false表示人員表

            context.write(PerKey, Person);

 

        }

    }

 

}

 

PersonAddreRedu.java

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

import java.io.IOException;

 

public class PersonAddreReduce extends Reducer<BeanKey, Bean, NullWritable, Text> {

    @Override

    protected void reduce(BeanKey key, Iterable<Bean> values,

                          Reducer<BeanKey, Bean, NullWritable, Text>.Context context)

            throws IOException, InterruptedException {

        Bean Addre = null;

        int num = 0;

        for (Bean bean : values) {

            if (num == 0) {

                Addre = new Bean(bean); // Address地址表爲values的第一個值

                num++;

            } else {

                // 其他全爲person表

                // 沒有list數組,節省大量內存空間

                bean.setAddreName(Addre.getAddreName());

                context.write(NullWritable.get(), new Text(bean.toString()));

            }

        }

    }

}

 

PersonAddreRedu.java

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

import java.io.IOException;

 

public class PersonAddreReduce extends Reducer<BeanKey, Bean, NullWritable, Text> {

    @Override

    protected void reduce(BeanKey key, Iterable<Bean> values,

                          Reducer<BeanKey, Bean, NullWritable, Text>.Context context)

            throws IOException, InterruptedException {

        Bean Addre = null;

        int num = 0;

        for (Bean bean : values) {

            if (num == 0) {

                Addre = new Bean(bean); // Address地址表爲values的第一個值

                num++;

            } else {

                // 其他全爲person表

                // 沒有list數組,節省大量內存空間

                bean.setAddreName(Addre.getAddreName());

                context.write(NullWritable.get(), new Text(bean.toString()));

            }

        }

    }

}

 

PKFKCompartor.java

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

 

/*

 * 實現Group分組

 * shuffle的group過程默認的是使用的key(BeanKey)的compareTo()方法

 * 剛纔咱們添加的自定義的Key沒有辦法將具備相同AddressNo的地址和人員放到同一個group中(由於從compareTo()方法中能夠看出他們是不相等的)

 * 咱們須要的就是本身定義一個groupComparer就能夠

 * 實現比較器

 */

public class PKFKCompartor extends WritableComparator {

 

    protected PKFKCompartor() {

        super(BeanKey.class, true);

    }

 

    //兩個BeanKey進行比較排序

    @Override

    public int compare(WritableComparable a, WritableComparable b) {

        BeanKey a1 = (BeanKey) a;

        BeanKey b1 = (BeanKey) b;

        if (a1.getAddreNo() == b1.getAddreNo()) {

            return 0;

        } else {

            return a1.getAddreNo() > b1.getAddreNo() ? 1 : -1;

        }

    }

}

 

PersonAddreMain.java

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class PersonAddreMain {

    public static void main(String[] args) throws Exception {

 

        args = new String[]{"F:\\A\\join\\", "F:\\A\\out_Andy1"};

 

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(PersonAddreMain.class);

 

        //設置自定義的group

        job.setGroupingComparatorClass(PKFKCompartor.class);

 

        job.setMapperClass(PersonAddreMap.class);

        job.setMapOutputKeyClass(BeanKey.class);

        job.setMapOutputValueClass(Bean.class);

 

        job.setReducerClass(PersonAddreRedu.class);

        job.setOutputKeyClass(NullWritable.class);

        job.setOutputValueClass(Text.class);

 

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

    }

}

相關文章
相關標籤/搜索