大數據學習筆記之Hadoop(三):MapReduce&YARN

文章目錄

一 MapReduce概念

Mapreduce是一個分佈式運算程序的編程框架,是用戶開發「基於hadoop的數據分析應用」的核心框架;
Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在一個hadoop集羣上。html

1.1 爲何要MapReduce

1)海量數據在單機上處理由於硬件資源限制,沒法勝任
2)而一旦將單機版程序擴展到集羣來分佈式運行,將極大增長程序的複雜度和開發難度
3)引入mapreduce框架後,開發人員能夠將絕大部分工做集中在業務邏輯的開發上,而將分佈式計算中的複雜性交由框架來處理。
4)mapreduce分佈式方案考慮的問題
(1)運算邏輯要不要先分後合?
(2)程序如何分配運算任務(切片)?
(3)兩階段的程序如何啓動?如何協調?
(4)整個程序運行過程當中的監控?容錯?重試?
分佈式方案須要考慮不少問題,可是咱們能夠將分佈式程序中的公共功能封裝成框架,讓開發人員將精力集中於業務邏輯上。而mapreduce就是這樣一個分佈式程序的通用框架。前端

1.2 MapReduce核心思想

在這裏插入圖片描述
上圖簡單的闡明瞭map和reduce的兩個過程或者做用,雖然不夠嚴謹,可是足以提供一個大概的認知,map過程是一個蔬菜到製成食物前的準備工做,reduce將準備好的材料合併進而製做出食物的過程
在這裏插入圖片描述
有一點小小的不同
在這裏插入圖片描述
3個節點每一個節點都存一份部分數據如圖紅色123所示(將數據分紅多份,即分割成多塊保存到節點中),mapTask是具體幹活的,把紅色和綠色的123,傳過來的數據進行分配調度。
若是用戶邏輯很是複雜,那就只能多個MapReduce程序,串行運行的意思是,將第一個MapReduce的結果再做爲輸入後面在進行一次MapReduce,如此往復。java

1)分佈式的運算程序每每須要分紅至少2個階段
2)第一個階段的maptask併發實例,徹底並行運行,互不相干
3)第二個階段的reduce task併發實例互不相干,可是他們的數據依賴於上一個階段的全部maptask併發實例的輸出
4)MapReduce編程模型只能包含一個map階段和一個reduce階段,若是用戶的業務邏輯很是複雜,那就只能多個mapreduce程序,串行運行node

1.3 MapReduce進程

一個完整的mapreduce程序在分佈式運行時有三類實例進程:
1)MrAppMaster:負責整個程序的過程調度及狀態協調(負責MapTask和ReduceTask的協調和調度)
2)MapTask:負責map階段的整個數據處理流程
3)ReduceTask:負責reduce階段的整個數據處理流程web

1.4 MapReduce編程規範(八股文)

用戶編寫的程序分紅三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
1)Mapper階段
(1)用戶自定義的Mapper要繼承本身的父類
(2)Mapper的輸入數據是KV對的形式(KV的類型可自定義)K-行號,V-行裏面的內容
(3)Mapper中的業務邏輯寫在map()方法中
(4)Mapper的輸出數據是KV對的形式(KV的類型可自定義)
(5)map()方法(maptask進程)對每個<K,V>調用一次
2)Reducer階段
(1)用戶自定義的Reducer要繼承本身的父類
(2)Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
(3)Reducer的業務邏輯寫在reduce()方法中
(4)Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
3)Driver階段
整個程序須要一個Drvier來進行提交,提交的是一個描述了各類必要信息的job對象
4)案例實操算法

詳見3.1.1統計一堆文件中單詞出現的個數(WordCount案例)。spring

1.5 MapReduce程序運行流程分析

在這裏插入圖片描述
提交Job,而後就知道了須要幾個Maptask,並且每一個task負責其中每一個文件中的那一段,而後經過inputformat框架,而後就會去文本文件中一行一行的讀,讀取完以後以kv對的形式存在Map task中,而後把kv對給了wordcount mapper,即咱們本身程序定義的wordcound mapper,key就是行號,value就是這一行的文本數據,講過map方法以後,經過Context.write寫出去,寫到outputCollector(出處緩衝區)中,而後根據k進行分區排序,這裏的分區決定了後面reduce的個數,以後等到全部的maptask任務都完成了,啓動相應數量的reducetask,並告知reducetask處理數據的範文,即一個reducetask只須要處理分區中的一部分,通過reduce task處理完以後,經過輸出礦建outputformat輸出結果。inputformat和outputfomat均可以本身定義,控制輸入和輸出。最後的最後就到了上面的例子中(詳見3.1.1),圖裏面可以看到的part-r-00001數據庫

1)在MapReduce程序讀取文件的輸入目錄上存放相應的文件。
2)客戶端程序在submit()方法執行前,獲取待處理的數據信息,而後根據集羣中參數的配置造成一個任務分配規劃。
3)客戶端提交job.split、jar包、job.xml等文件給yarn,yarn中的resourcemanager啓動MRAppMaster。
4)MRAppMaster啓動後根據本次job的描述信息,計算出須要的maptask實例數量,而後向集羣申請機器啓動相應數量的maptask進程。
5)maptask利用客戶指定的inputformat來讀取數據,造成輸入KV對。
6)maptask將輸入KV對傳遞給客戶定義的map()方法,作邏輯運算
7)map()運算完畢後將KV對收集到maptask緩存。
8)maptask緩存中的KV對按照K分區排序後不斷寫到磁盤文件
9)MRAppMaster監控到全部maptask進程任務完成以後,會根據客戶指定的參數啓動相應數量的reducetask進程,並告知reducetask進程要處理的數據分區。
10)Reducetask進程啓動以後,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,並在本地進行從新歸併排序,而後按照相同key的KV爲一個組,調用客戶定義的reduce()方法進行邏輯運算。
11)Reducetask運算完畢後,調用客戶指定的outputformat將結果數據輸出到外部存儲。apache

二 MapReduce理論篇

2.1 Writable序列化

序列化就是把內存中的對象,轉換成字節序列(或其餘數據傳輸協議)以便於存儲(持久化)和網絡傳輸。
反序列化就是將收到字節序列(或其餘數據傳輸協議)或者是硬盤的持久化數據,轉換成內存中的對象。
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化後,會附帶不少額外的信息(各類校驗信息,header,繼承體系等),不便於在網絡中高效傳輸。因此,hadoop本身開發了一套序列化機制(Writable),精簡、高效。編程

2.1.1 經常使用數據序列化類型

經常使用的數據類型對應的hadoop數據序列化類型
Java類型 Hadoop Writable類型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
string Text
map MapWritable
array ArrayWritable

2.1.2 自定義bean對象實現序列化接口

1)自定義bean對象要想序列化傳輸,必須實現序列化接口,須要注意如下7項。
(1)必須實現Writable接口
(2)反序列化時,須要反射調用空參構造函數,因此必須有空參構造
(3)重寫序列化方法
(4)重寫反序列化方法
(5)注意反序列化的順序和序列化的順序徹底一致
(6)要想把結果顯示在文件中,須要重寫toString(),且用」\t」分開,方便後續用
(7)若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序

能夠到下面直接看實戰

// 1 必須實現Writable接口
public class FlowBean implements Writable {

	private long upFlow;
	private long downFlow;
	private long sumFlow;

	//2 反序列化時,須要反射調用空參構造函數,因此必須有
	public FlowBean() {
		super();
	}

	/** * 3重寫序列化方法 * * @param out * @throws IOException */
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}

	/** * 4 重寫反序列化方法 5 注意反序列化的順序和序列化的順序徹底一致 * * @param in * @throws IOException */
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow = in.readLong();
		downFlow = in.readLong();
		sumFlow = in.readLong();
	}

    // 6要想把結果顯示在文件中,須要重寫toString(),且用」\t」分開,方便後續用
	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

    //7 若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序
	@Override
	public int compareTo(FlowBean o) {
		// 倒序排列,從大到小
		return this.sumFlow > o.getSumFlow() ? -1 : 1;
	}
}

2)案例實操
詳見3.2.1統計每個手機號耗費的總上行流量、下行流量、總流量(序列化)。

2.2 InputFormat數據切片機制

2.2.1 FileInputFormat切片機制

1)job提交流程源碼詳解
waitForCompletion()
submit();
// 1創建鏈接
connect();
// 1)建立提交job的代理
new Cluster(getConfiguration());
// (1)判斷是本地yarn仍是遠程
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)建立給集羣提交數據的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)獲取jobid ,並建立job路徑
JobID jobId = submitClient.getNewJobID();
// 3)拷貝jar包到集羣
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)計算切片,生成切片規劃文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
在這裏插入圖片描述
在這裏插入圖片描述
從這裏能看到爲何默認切片大小等於塊的大小
在這裏插入圖片描述
每次切片後看是否大於塊的1.1倍,大於的時候就新加一個切片,不大於就劃分爲一個切片。
// 5)向Stag路徑寫xml配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交job,返回提交狀態
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
在這裏插入圖片描述
2)FileInputFormat源碼解析(input.getSplits(job))
(1)找到你數據存儲的目錄。
(2)開始遍歷處理(規劃切片)目錄下的每個文件
(3)遍歷第一個文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計算切片大小computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128M
c)默認狀況下,切片大小=blocksize
d)開始切,造成第1個切片:ss.txt—0:128M 第2個切片ss.txt—128:256M 第3個切片ss.txt—256M:300M( 每次切片時,都要判斷切完剩下的部分是否大於塊的1.1倍,不大於1.1倍就劃分一塊切片
e)將切片信息寫到一個切片規劃文件中
f)整個切片的核心過程在getSplit()方法中完成。
g)數據切片只是在邏輯上對輸入數據進行分片,並不會再磁盤上將其切分紅分片進行存儲。InputSplit只記錄了分片的元數據信息,好比起始位置、長度以及所在的節點列表等。
h)注意:block是HDFS上物理上存儲的存儲的數據,切片是對數據邏輯上的劃分。
(4)提交切片規劃文件到yarn上,yarn上的MrAppMaster就能夠根據切片規劃文件計算開啓maptask個數。
3)FileInputFormat中默認的切片機制:
(1)簡單地按照文件的內容長度進行切片
(2)切片大小,默認等於block大小
(3)切片時不考慮數據集總體,而是逐個針對每個文件單獨切片
好比待處理數據有兩個文件:
file1.txt 320M
file2.txt 10M
通過FileInputFormat的切片機制運算後,造成的切片信息以下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
4)FileInputFormat切片大小的參數配置
(1)經過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1 默認值爲1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue
所以, 默認狀況下,切片大小=blocksize。
maxsize(切片最大值):參數若是調得比blocksize小,則會讓切片變小,並且就等於配置的這個參數的值。
minsize (切片最小值):參數調的比blockSize大,則可讓切片變得比blocksize還大。
5)獲取切片信息API
// 根據文件類型獲取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 獲取切片的文件名稱
String name = inputSplit.getPath().getName();

2.2.2 CombineTextInputFormat切片機制

關於大量小文件的優化策略
1)默認狀況下TextInputformat對任務的切片機制是按文件規劃切片,無論文件多小,都會是一個單獨的切片,都會交給一個maptask,這樣若是有大量小文件,就會產生大量的maptask,處理效率極其低下。
2)優化策略
(1)最好的辦法,在數據處理系統的最前端(預處理/採集),將小文件先合併成大文件,再上傳到HDFS作後續分析。
(2)補救措施:若是已是大量小文件在HDFS中了,可使用另外一種InputFormat來作切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不一樣:它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個maptask。
(3)優先知足最小切片大小,不超過最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
3)具體實現步驟
// 9 若是不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
在這裏插入圖片描述
在這裏插入圖片描述
在這裏插入圖片描述
在這裏插入圖片描述
4)案例實操
詳見3.1.4 需求4:大量小文件的切片優化(CombineTextInputFormat)。

2.2.3 自定義InputFormat

1)概述
(1)自定義一個InputFormat
(2)改寫RecordReader,實現一次讀取一個完整文件封裝爲KV
(3)在輸出時使用SequenceFileOutPutFormat輸出合併文件
2)案例實操
詳見3.5小文件處理(自定義InputFormat)。

2.3 MapTask工做機制

1)問題引出
maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,mapTask並行任務是否越多越好呢?

在這裏插入圖片描述若是是100m一個map task,須要將第一個塊的後半部分分出來到第二個塊中,效率確定低,因此每128,即一塊一個map task
2)MapTask並行度決定機制
一個job的map階段MapTask並行度(個數),由客戶端提交job時的切片個數決定。
3)MapTask工做機制
(1)Read階段:Map Task經過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,併產生一系列新的key/value。
(3)Collect階段:在用戶編寫map()函數中,當數據處理完成後,通常會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩衝區中。
(4)Spill階段:即「溢寫」,當環形緩衝區滿後,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。須要注意的是,將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操做。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號partition進行排序,而後按照key進行排序。這樣,通過排序後,數據以分區爲單位彙集在一塊兒,且同一分區內全部數據按照key有序。
步驟2:按照分區編號由小到大依次將每一個分區中的數據寫入任務工做目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。若是用戶設置了Combiner,則寫入文件以前,對每一個分區中的數據進行一次彙集操做。
步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每一個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小。若是當期內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。
(5)Combine階段:當全部數據處理完成後,MapTask對全部臨時文件進行一次合併,以確保最終只會生成一個數據文件。
當全部數據處理完後,MapTask會將全部臨時文件合併成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。
在進行文件合併過程當中,MapTask以分區爲單位進行合併。對於某個分區,它將採用多輪遞歸合併的方式。每輪合併io.sort.factor(默認100)個文件,並將產生的文件從新加入待合併列表中,對文件排序後,重複以上過程,直到最終獲得一個大文件。
讓每一個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。

2.4 Shuffle機制(重點)

概述
1 mapreduce中,map階段處理的數據如何傳遞給reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程叫shuffle,
2 shuffle 洗牌、發牌(核心機制:數據分區、排序、緩存)
3 具體來講:就是將maptask輸出的處理結果數據,分發給reducetask,比你高在分發的過程當中對數據按 key 進行了分區和排序。

2.4.1 Shuffle機制

Mapreduce確保每一個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出做爲輸入傳給reducer)稱爲shuffle。
在這裏插入圖片描述
在這裏插入圖片描述
方框中就是shuffle
merge合併,merge後面是歸併。

2.4.2 MapReduce工做流程

1)流程示意圖

在這裏插入圖片描述
對文件進行切片等處理,計算切片,運算完了以後會得出三個文件,job.xml、wc.jar(若是是在本地的話是沒有的)、job.xml,傳到yarn上,yarn從 MapReduce上提交過來的信息,獲取切片信息,計算出maptask的數量,經過inputformate讀取數據,kv對到了用戶自定義的Mapper中,而後重寫map,map以前有setup,map以後又cleanout方法,而後經過context.write方法將數據寫出,寫到緩衝區,coutputconllector,在緩衝區中仍是kv對的形式,由於只map方法階段,進行了切割,緩衝區默認是100m,到達80的時候,就須要往外面寫了,不然會溢出,而後就會進行分區、排序,分區上面的例子中已經自定義過了,默認是hashPartitioner方式(key的hashcode值,對mapTask個數取餘),具體的看2.4.3,溢出到了文件,尚未到最終的分區,只是說緩存不夠了,須要快速的找個地方先存起來,這裏多是一段一段的,最後再進行一次歸併排序。可是看歸併排序那裏仍是<a,1><a,1>,爲何不是<a,2>呢?因此在分區排序的下面就有的合併, 經合併後的數據再給歸併,combiner和reduce是有區別的具體看 2.4.6。第9第10,實際上是存在磁盤中的,這也就是爲何hadoop比較慢的緣由,

在這裏插入圖片描述
上面兩個圖中黃色框的部分,都是咱們能夠自定義的,紅色的都是不可改變的

2)流程詳解
上面的流程是整個mapreduce最全工做流程,可是shuffle過程只是從第7步開始到第16步結束,具體shuffle過程詳解,以下:
1)maptask收集咱們的map()方法輸出的kv對,放到內存緩衝區中
2)從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合併成大的溢出文件
4)在溢出過程當中,及合併的過程當中,都要調用partitoner進行分組和針對key進行排序
5)reducetask根據本身的分區號,去各個maptask機器上取相應的結果分區數據
6)reducetask會取到同一個分區的來自不一樣maptask的結果文件,reducetask會將這些文件再進行合併(歸併排序)
7)合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
3)注意
Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快。
緩衝區的大小能夠經過參數調整,參數:io.sort.mb 默認100M

2.4.3 partition分區

0)問題引出:要求將統計結果按照條件輸出到不一樣文件中(分區)。好比:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(分區)
1)默認partition分區

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}
默認分區是根據key的hashCode對reduceTasks個數取模獲得的。用戶無法控制哪一個key存儲到哪一個分區

2)自定義Partitioner步驟
(1)自定義類繼承Partitioner,從新getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 獲取電話號碼的前三位
		String preNum = key.toString().substring(0, 3);
		
		int partition = 4;
		
		// 2 判斷是哪一個省
		if ("136".equals(preNum)) {
			partition = 0;
		}else if ("137".equals(preNum)) {
			partition = 1;
		}else if ("138".equals(preNum)) {
			partition = 2;
		}else if ("139".equals(preNum)) {
			partition = 3;
		}
		return partition;
	}
}

(2)在job驅動中,設置自定義partitioner:
job.setPartitionerClass(CustomPartitioner.class)
(3)自定義partition後,要根據自定義partitioner的邏輯設置相應數量的reduce task
job.setNumReduceTasks(5);
3)注意:
若是reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
若是1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;
若是reduceTask的數量=1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;

例如:假設自定義分區數爲5,則
(1)job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
(2)job.setNumReduceTasks(2);會報錯
(3)job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件
4)案例實操
詳見3.2.2 需求2:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(Partitioner)
詳見3.1.2 需求2:把單詞按照ASCII碼奇偶分區(Partitioner)

2.4.4 排序

排序是MapReduce框架中最重要的操做之一。Map Task和Reduce Task均會對數據(按照key)進行排序。該操做屬於Hadoop的默認行爲。任何應用程序中的數據均會被排序,而無論邏輯上是否須要。
對於Map Task,它會將處理的結果暫時放到一個緩衝區中,當緩衝區使用率達到必定閾值後,再對緩衝區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢後,它會對磁盤上全部文件進行一次合併,以將這些文件合併成一個大的有序文件。
對於Reduce Task,它從每一個Map Task上遠程拷貝相應的數據文件,若是文件大小超過必定閾值,則放到磁盤上,不然放到內存中。若是磁盤上文件數目達到必定閾值,則進行一次合併以生成一個更大文件;若是內存中文件大小或者數目超過必定閾值,則進行一次合併後將數據寫到磁盤上。當全部數據拷貝完畢後,Reduce Task統一對內存和磁盤上的全部數據進行一次合併。
每一個階段的默認排序
1)排序的分類:
(1)部分排序:
MapReduce根據輸入記錄的鍵對數據集排序。保證輸出的每一個文件內部排序。
(2)全排序:
如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失了MapReduce所提供的並行架構。
替代方案:首先建立一系列排好序的文件;其次,串聯這些文件;最後,生成一個全局排序的文件。主要思路是使用一個分區來描述輸出的全局排序。例如:能夠爲上述文件建立3個分區,在第一分區中,記錄的單詞首字母a-g,第二分區記錄單詞首字母h-n, 第三分區記錄單詞首字母o-z。
(3)輔助排序:(GroupingComparator分組)
Mapreduce框架在記錄到達reducer以前按鍵對記錄排序,但鍵所對應的值並無被排序。甚至在不一樣的執行輪次中,這些值的排序也不固定,由於它們來自不一樣的map任務且這些map任務在不一樣輪次中完成時間各不相同。通常來講,大多數MapReduce程序會避免讓reduce函數依賴於值的排序。可是,有時也須要經過特定的方法對鍵進行排序和分組等以實現對值的排序。

2)自定義排序WritableComparable
(1)原理分析
bean對象實現WritableComparable接口重寫compareTo方法,就能夠實現排序
@Override
public int compareTo(FlowBean o) {
// 倒序排列,從大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
(2)案例實操
詳見3.2.3 需求3:將統計結果按照總流量倒序排序(排序)

2.4.5 GroupingComparator分組

1)對reduce階段的數據根據某一個或幾個字段進行分組。
2)案例實操
詳見3.3 求出每個訂單中最貴的商品(GroupingComparator)

2.4.6 Combiner合併

1)combiner是MR程序中Mapper和Reducer以外的一種組件
2)combiner組件的父類就是Reducer
combiner和Reducer是有區別的,combiner是在每一個maptask都有一個,Reducer是將全部的mapTask的結果進行合併後去執行。
在這裏插入圖片描述
3)combiner和reducer的區別在於運行的位置:
Combiner是在每個maptask所在的節點運行
Reducer是接收全局全部Mapper的輸出結果;
4)combiner的意義就是對每個maptask的輸出進行局部彙總,以減少網絡傳輸量
5)自定義Combiner實現步驟:
(1)自定義一個combiner繼承Reducer,重寫reduce方法

//前兩個參數是map的輸出
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	//傳進來的數據<a,1><a,1><a,1> 但願輸出<a,3>
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		int count = 0;
		for(IntWritable v :values){
			count += v.get();
		}
		context.write(key, new IntWritable(count));
	}
}

經過和reduce相比,能夠發現基本上是沒有區別的。
在這裏插入圖片描述
教程中有個文件有兩個hive
在這裏插入圖片描述
因此輸入變成了14,具體 我也想不通,
輸出變成了13,這個好像說得通

若是將job.setCombinerClass(WordcountCombiner.class);(這句話的意思在本小節的下面)註釋掉的話,
在這裏插入圖片描述
(2)在job中設置:
job.setCombinerClass(WordcountCombiner.class);
6)combiner可以應用的前提是不能影響最終的業務邏輯,並且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來
Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4
Reducer
(3+5+7+2+6)/5=23/5 不等於 (5+4)/2=9/2
7)案例實操
詳見3.1.3需求3:對每個maptask的輸出局部彙總(Combiner)

2.4.7 數據傾斜&Distributedcache

1)數據傾斜緣由
若是是多張表的操做都是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜。
2)實操案例:
詳見3.4.1 需求1:reduce端表合併(數據傾斜)
3)解決方案
在map端緩存多張表,提早處理業務邏輯,這樣增長map端業務,減小reduce端數據的壓力,儘量的減小數據傾斜。
4)具體辦法:採用distributedcache
(1)在mapper的setup階段,將文件讀取到緩存集合中
(2)在驅動函數中加載緩存。
job.addCacheFile(new URI(「file:/e:/mapjoincache/pd.txt」));// 緩存普通文件到task運行節點
5)實操案例:
詳見3.4.2需求2:map端表合併(Distributedcache)

2.5 ReduceTask工做機制

1)設置ReduceTask
reducetask的並行度一樣影響整個job的執行併發度和執行效率,但與maptask的併發數由切片數決定不一樣,Reducetask數量的決定是能夠直接手動設置:
//默認值是1,手動設置爲4
job.setNumReduceTasks(4);
2)注意
(1)若是數據分佈不均勻,就有可能在reduce階段產生數據傾斜
(2)reducetask數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有1個reducetask。
(3)具體多少個reducetask,須要根據集羣性能而定。
(4)若是分區數不是1,可是reducetask爲1,是否執行分區過程。答案是:不執行分區過程。由於在maptask的源碼中,執行分區的前提是先判斷reduceNum個數是否大於1。不大於1確定不執行。
3)實驗:測試reducetask多少合適。
(1)實驗環境:1個master節點,16個slave節點: CPU:8GHZ , 內存: 2G
(2)實驗結論:
表1 改變reduce task (數據量爲1GB)
Map task =16
Reduce task 1 5 10 15 16 20 25 30 45 60
總時間 892 146 110 92 88 100 128 101 145 104
4)ReduceTask工做機制
(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,若是其大小超過必定閾值,則寫到磁盤上,不然直接放到內存中。
(2)Merge階段:在遠程拷貝數據的同時,ReduceTask啓動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多。
(3)Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行彙集的一組數據。爲了將key相同的數據聚在一塊兒,Hadoop採用了基於排序的策略。因爲各個MapTask已經實現對本身的處理結果進行了局部排序,所以,ReduceTask只需對全部數據進行一次歸併排序便可。
(4)Reduce階段:reduce()函數將計算結果寫到HDFS上。

2.6 自定義OutputFormat

1)概述
(1)要在一個mapreduce程序中根據數據的不一樣輸出兩類結果到不一樣目錄,這類靈活的輸出需求能夠經過自定義outputformat來實現。
(1)自定義outputformat,
(2)改寫recordwriter,具體改寫輸出數據的方法write()
2)實操案例:
詳見3.6 修改日誌內容及自定義日誌輸出路徑(自定義OutputFormat)。

2.7 MapReduce數據壓縮

2.7.1 概述

壓縮技術可以有效減小底層存儲系統(HDFS)讀寫字節數。壓縮提升了網絡帶寬和磁盤空間的效率。在Hadood下,尤爲是數據規模很大和工做負載密集的狀況下,使用數據壓縮顯得很是重要。在這種狀況下,I/O操做和網絡數據傳輸要花大量的時間。還有,Shuffle與Merge過程一樣也面臨着巨大的I/O壓力。
鑑於磁盤I/O和網絡帶寬是Hadoop的寶貴資源,數據壓縮對於節省資源、最小化磁盤I/O和網絡傳輸很是有幫助。不過,儘管壓縮與解壓操做的CPU開銷不高,其性能的提高和資源的節省並不是沒有代價。
若是磁盤I/O和網絡帶寬影響了MapReduce做業性能,在任意MapReduce階段啓用壓縮均可以改善端到端處理時間並減小I/O和網絡流量。
壓縮mapreduce的一種優化策略:經過壓縮編碼對mapper或者reducer的輸出進行壓縮,以減小磁盤IO,提升MR程序運行速度(但相應增長了cpu運算負擔)
注意:壓縮特性運用得當能提升性能,但運用不當也可能下降性能
基本原則:
(1)運算密集型的job,少用壓縮
(2)IO密集型的job,多用壓縮

2.7.2 MR支持的壓縮編碼

壓縮格式 工具 算法 文件擴展名 是否可切分
DEFAULT 無 DEFAULT .deflate 否
Gzip gzip DEFAULT .gz 否
bzip2 bzip2 bzip2 .bz2 是
LZO lzop LZO .lzo 否
LZ4 無 LZ4 .lz4 否
Snappy 無 Snappy .snappy 否
爲了支持多種壓縮/解壓縮算法,Hadoop引入了編碼/解碼器,以下表所示
壓縮格式 對應的編碼/解碼器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec
壓縮性能的比較
壓縮算法 原始文件大小 壓縮文件大小 壓縮速度 解壓速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

2.7.3 採用壓縮的位置

壓縮能夠在MapReduce做用的任意階段啓用。
在這裏插入圖片描述
1)輸入壓縮:
在有大量數據並計劃重複處理的狀況下,應該考慮對輸入進行壓縮。然而,你無須顯示指定使用的編解碼方式。Hadoop自動檢查文件擴展名,若是擴展名可以匹配,就會用恰當的編解碼方式對文件進行壓縮和解壓。不然,Hadoop就不會使用任何編解碼器。
2)壓縮mapper輸出:
當map任務輸出的中間數據量很大時,應考慮在此階段採用壓縮技術。這能顯著改善內部數據Shuffle過程,而Shuffle過程在Hadoop處理過程當中是資源消耗最多的環節。若是發現數據量大形成網絡傳輸緩慢,應該考慮使用壓縮技術。可用於壓縮mapper輸出的快速編解碼器包括LZO、LZ4或者Snappy。
注:LZO是供Hadoop壓縮數據用的通用壓縮編解碼器。其設計目標是達到與硬盤讀取速度至關的壓縮速度,所以速度是優先考慮的因素,而不是壓縮率。與gzip編解碼器相比,它的壓縮速度是gzip的5倍,而解壓速度是gzip的2倍。同一個文件用LZO壓縮後比用gzip壓縮後大50%,但比壓縮前小25%~50%。這對改善性能很是有利,map階段完成時間快4倍。
3)壓縮reducer輸出:
在此階段啓用壓縮技術可以減小要存儲的數據量,所以下降所需的磁盤空間。當mapreduce做業造成做業鏈條時,由於第二個做業的輸入也已壓縮,因此啓用壓縮一樣有效。

2.7.4 壓縮配置參數

要在Hadoop中啓用壓縮,能夠配置以下參數(mapred-site.xml文件中):
參數 默認值 階段 建議
io.compression.codecs
(在core-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.Lz4Codec 輸入壓縮 Hadoop使用文件擴展名判斷是否支持某種編解碼器
mapreduce.map.output.compress false mapper輸出 這個參數設爲true啓用壓縮
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec mapper輸出 使用LZO、LZ4或snappy編解碼器在此階段壓縮數據
mapreduce.output.fileoutputformat.compress false reducer輸出 這個參數設爲true啓用壓縮
mapreduce.output.fileoutputformat.compress.codec org.apache.hadoop.io.compress. DefaultCodec reducer輸出 使用標準工具或者編解碼器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type RECORD reducer輸出 SequenceFile輸出使用的壓縮類型:NONE和BLOCK

2.8 計數器應用

Hadoop爲每一個做業維護若干內置計數器,以描述多項指標。例如,某些計數器記錄已處理的字節數和記錄數,使用戶可監控已處理的輸入數據量和已產生的輸出數據量。

1)API
(1)採用枚舉的方式統計計數
enum MyCounter{MALFORORMED,NORMAL}
//對枚舉定義的自定義計數器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
(2)採用計數器組、計數器名稱的方式統計
context.getCounter(「counterGroup」, 「countera」).increment(1);
組名和計數器名稱隨便起,但最好有意義。
(3)計數結果在程序運行後的控制檯上查看。
2)案例實操
詳見3.6 修改日誌內容及自定義日誌輸出路徑(自定義OutputFormat)。

2.9 數據清洗

1)概述
在運行Mapreduce程序以前,每每要先對數據進行清洗,清理掉不符合用戶要求的數據。清理的過程每每只須要運行mapper程序,不須要運行reduce程序。
2)實操案例
詳見3.7 日誌清洗(數據清洗)。

2.10 MapReduce與Yarn

2.10.1 Yarn概述

Yarn是一個資源調度平臺,負責爲運算程序提供服務器運算資源,至關於一個分佈式的操做系統平臺,而mapreduce等運算程序則至關於運行於操做系統之上的應用程序

2.10.2 Yarn的重要概念

1)Yarn並不清楚用戶提交的程序的運行機制
2)Yarn只提供運算資源的調度(用戶程序向Yarn申請資源,Yarn就負責分配資源)
3)Yarn中的主管角色叫ResourceManager
4)Yarn中具體提供運算資源的角色叫NodeManager
5)這樣一來,Yarn其實就與運行的用戶程序徹底解耦,就意味着Yarn上能夠運行各類類型的分佈式運算程序(mapreduce只是其中的一種),好比mapreduce、storm程序,spark程序……
6)因此,spark、storm等運算框架均可以整合在Yarn上運行,只要他們各自的框架中有符合Yarn規範的資源請求機制便可
7)Yarn就成爲一個通用的資源調度平臺,今後,企業中之前存在的各類運算集羣均可以整合在一個物理集羣上,提升資源利用率,方便數據共享

2.10.3 Yarn工做機制

1)Yarn運行機制
在這裏插入圖片描述
submit提交過程當中其實就是提交的xml,切片,和相應的jar包,在yarn上能看到jar包,在本地看不到,本地運行的話是localrunner,想要想ResourceManager提交,首先申請一個資源,而後資源的提交路徑是hdfs://xxxx以及applicaton_id,而後就開始提交文件(split xml jar)到yarn上,資源提交完畢,申請運行mrAppMaster,而後將提交的這些文件轉化成任務,放入到調度隊列中,NodeManager去任務隊列中領取任務,而後建立一個容器來處理任務,在這個容器中須要分配好cpu和內存,而且建立一個MRAappmaster,即MR程序(參看第0步)的老大,而後從集羣上讀取三個東西到容器中,讀取到切片信息後,MRAppmaster經過切片信息申請maptask容器,具體申請幾個在split中已經寫了,具體申請開誰,ResourceManager說了算,而後兩個nodeManager領到了兩個任務,具體是兩個仍是幾個實際狀況而定,領到任務以後也須要一個包含申請到cpu 內存 jar包的容器,而後MRAppmaster和nodemanager進行了關聯,即可以進行分區排序,而後map階段完成,而後ResourceManager開始申請運行reducetask程序, 一樣申請了兩個nodeManager,而且從上面的map中獲取相應的分區的數據,以後一切運行完以後,MRAppmaster會想ResourceManager申請註銷本身,包括後面的四個(舉例數字)nodeManager也隨之會被註銷。
2)工做機制詳解
(0)Mr程序提交到客戶端所在的節點
(1)yarnrunner向Resourcemanager申請一個application。
(2)rm將該應用程序的資源路徑返回給yarnrunner
(3)該程序將運行所需資源提交到HDFS上
(4)程序資源提交完畢後,申請運行mrAppMaster
(5)RM將用戶的請求初始化成一個task
(6)其中一個NodeManager領取到task任務。
(7)該NodeManager建立容器Container,併產生MRAppmaster
(8)Container從HDFS上拷貝資源到本地
(9)MRAppmaster向RM 申請運行maptask容器
(10)RM將運行maptask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務並建立容器。
(11)MR向兩個接收到任務的NodeManager發送程序啓動腳本,這兩個NodeManager分別啓動maptask,maptask對數據分區排序。
(12)MRAppmaster向RM申請2個容器,運行reduce task。
(13)reduce task向maptask獲取相應分區的數據。
(14)程序運行完畢後,MR會向RM註銷本身。

2.11 做業提交全過程

1)做業提交過程之YARN
在這裏插入圖片描述
2)做業提交過程之MapReduce
在這裏插入圖片描述
3)做業提交過程之讀數據
在這裏插入圖片描述
4)做業提交過程之寫數據
在這裏插入圖片描述

2.12 MapReduce開發總結

mapreduce在編程的時候,基本上一個固化的模式,沒有太多可靈活改變的地方,除了如下幾處:
1)輸入數據接口:InputFormat—>FileInputFormat(文件類型數據讀取的通用抽象類) DBInputFormat (數據庫數據讀取的通用抽象類)
默認使用的實現類是:TextInputFormat
job.setInputFormatClass(TextInputFormat.class)
TextInputFormat的功能邏輯是:一次讀一行文本,而後將該行的起始偏移量做爲key,行內容做爲value返回
2)邏輯處理接口: Mapper
徹底須要用戶本身去實現其中:map() setup() clean()
3)map輸出的結果在shuffle階段會被partition以及sort,此處有兩個接口可自定義:
(1)Partitioner
有默認實現 HashPartitioner,邏輯是 根據key和numReduces來返回一個分區號; key.hashCode()&Integer.MAXVALUE % numReduces
一般狀況下,用默認的這個HashPartitioner就能夠,若是業務上有特別的需求,能夠自定義
(2)Comparable
當咱們用自定義的對象做爲key來輸出時,就必需要實現WritableComparable接口,override其中的compareTo()方法
4)reduce端的數據分組比較接口:Groupingcomparator
reduceTask拿到輸入數據(一個partition的全部數據)後,首先須要對數據進行分組,其分組的默認原則是key相同,而後對每一組kv數據調用一次reduce()方法,而且將這一組kv中的第一個kv的key做爲參數傳給reduce的key,將這一組數據的value的迭代器傳給reduce()的values參數
利用上述這個機制,咱們能夠實現一個高效的分組取最大值的邏輯:
自定義一個bean對象用來封裝咱們的數據,而後改寫其compareTo方法產生倒序排序的效果
而後自定義一個Groupingcomparator,將bean對象的分組邏輯改爲按照咱們的業務分組id來分組(好比訂單號)
這樣,咱們要取的最大值就是reduce()方法中傳進來key
5)邏輯處理接口:Reducer
徹底須要用戶本身去實現其中 reduce() setup() clean()
6)輸出數據接口:OutputFormat—> 有一系列子類FileOutputformat DBoutputFormat …
默認實現類是TextOutputFormat,功能邏輯是:將每個KV對向目標文本文件中輸出爲一行

2.13 MapReduce參數優化

2.13.1 資源相關參數

1)如下參數是在用戶本身的mr應用程序中配置就能夠生效
配置參數 參數說明
mapreduce.map.memory.mb 一個Map Task可以使用的資源上限(單位:MB),默認爲1024。若是Map Task實際使用的資源量超過該值,則會被強制殺死。
mapreduce.reduce.memory.mb 一個Reduce Task可以使用的資源上限(單位:MB),默認爲1024。若是Reduce Task實際使用的資源量超過該值,則會被強制殺死。
mapreduce.map.java.opts Map Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g.
「-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc」 (@taskid@會被Hadoop框架自動換爲相應的taskid), 默認值: 「」
mapreduce.reduce.java.opts Reduce Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g.
「-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc」, 默認值: 「」
mapreduce.map.cpu.vcores 每一個Map task可以使用的最多cpu core數目, 默認值: 1
mapreduce.reduce.cpu.vcores 每一個Reduce task可以使用的最多cpu core數目, 默認值: 1
2)應該在yarn啓動以前就配置在服務器的配置文件中才能生效
配置參數 參數說明
yarn.scheduler.minimum-allocation-mb 1024 給應用程序container分配的最小內存
yarn.scheduler.maximum-allocation-mb 8192 給應用程序container分配的最大內存
yarn.scheduler.minimum-allocation-vcores 1
yarn.scheduler.maximum-allocation-vcores 32
yarn.nodemanager.resource.memory-mb 8192
3)shuffle性能優化的關鍵參數,應在yarn啓動以前就配置好
配置參數 參數說明
mapreduce.task.io.sort.mb 100 shuffle的環形緩衝區大小,默認100m
mapreduce.map.sort.spill.percent 0.8 環形緩衝區溢出的閾值,默認80%

2.13.2 容錯相關參數

配置參數 參數說明
mapreduce.map.maxattempts 每一個Map Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。
mapreduce.reduce.maxattempts 每一個Reduce Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。
mapreduce.map.failures.maxpercent 當失敗的Map Task失敗比例超過該值爲,整個做業則失敗,默認值爲0. 若是你的應用程序容許丟棄部分輸入數據,則該該值設爲一個大於0的值,好比5,表示若是有低於5%的Map Task失敗(若是一個Map Task重試次數超過mapreduce.map.maxattempts,則認爲這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個做業扔認爲成功。
mapreduce.reduce.failures.maxpercent 當失敗的Reduce Task失敗比例超過該值爲,整個做業則失敗,默認值爲0。
mapreduce.task.timeout Task超時時間,常常須要設置的一個參數,該參數表達的意思爲:若是一個task在必定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認爲該task處於block狀態,多是卡住了,也許永遠會卡主,爲了防止由於用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。若是你的程序對每條輸入數據的處理時間過長(好比會訪問數據庫,經過網絡拉取數據等),建議將該參數調大,該參數太小常出現的錯誤提示是「AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.」。

三 MapReduce實戰篇

3.1 WordCount案例

3.1.1 需求1:統計一堆文件中單詞出現的個數(WordCount案例)

0)需求:在一堆給定的文本文件中統計輸出每個單詞出現的總次數
1)數據準備:
在這裏插入圖片描述
在這裏插入圖片描述
2)分析
按照mapreduce編程規範,分別編寫Mapper,Reducer,Driver。
在這裏插入圖片描述
在這裏插入圖片描述

3)編寫程序
將以前的143個hadoopjar包拷貝過來
看一下Mapper類的部分代碼
在這裏插入圖片描述
能夠看到,每有一行,就執行一次map方法
(1)定義一個mapper類

package com.atguigu.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** * KEYIN:默認狀況下,是mr框架所讀到的一行文本的起始偏移量,Long;行號 * 在hadoop中有本身的更精簡的序列化接口,因此不直接用Long,而是用LongWritable * VALUEIN:默認狀況下,是mr框架所讀到的一行文本內容,String;此處用Text * * KEYOUT:是用戶自定義邏輯處理完成以後輸出數據中的key,在此處是單詞,String;此處用Text * VALUEOUT,是用戶自定義邏輯處理完成以後輸出數據中的value,在此處是單詞次數,Integer,此處用IntWritable * @author Administrator */
 //前兩個輸入的key value ,後兩個,輸出的key value
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	/** * map階段的業務邏輯就寫在自定義的map()方法中 * maptask會對每一行輸入數據調用一次咱們自定義的map()方法 */
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// 1 將maptask傳給咱們的文本內容先轉換成String
		String line = value.toString();
		
		// 2 根據空格將這一行切分紅單詞
		String[] words = line.split(" ");
		
		// 3 將單詞輸出爲<單詞,1>
		for(String word:words){
			// 將單詞做爲key,將次數1做爲value,以便於後續的數據分發,能夠根據單詞分發,以便於相同單詞會到相同的reducetask中
			context.write(new Text(word), new IntWritable(1));
		}
	}
}

(2)定義一個reducer類

package com.atguigu.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/** * KEYIN , VALUEIN 對應mapper輸出的KEYOUT, VALUEOUT類型 * * KEYOUT,VALUEOUT 對應自定義reduce邏輯處理結果的輸出數據類型 KEYOUT是單詞 VALUEOUT是總次數 * @author Administrator */
//四個參數爲key value
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

	/** * key,是一組相同單詞kv對的key */
	@Override
	//atguigu 1 atguigu 1 atguigu 1
	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {

		int count = 0;

		// 1 彙總各個key的個數
		for(IntWritable value:values){
			count +=value.get();
		}
		
		// 2輸出該key的總次數
		context.write(key, new IntWritable(count));
	}
}

(3)定義一個主類,用來描述job並提交job

package com.atguigu.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/** * 至關於一個yarn集羣的客戶端, * 須要在此封裝咱們的mr程序相關運行參數,指定jar包 * 最後提交給yarn * @author Administrator */
public class WordcountDriver {
	public static void main(String[] args) throws Exception {
		// 1 獲取配置信息,或者job對象實例
		Configuration configuration = new Configuration();
		// 8 配置提交到yarn上運行,windows和Linux變量不一致
// configuration.set("mapreduce.framework.name", "yarn");
// configuration.set("yarn.resourcemanager.hostname", "hadoop103");

		//有了Job對象才能在下面進行配置
		Job job = Job.getInstance(configuration);
		
		// 6 指定本程序的jar包所在的本地路徑
// job.setJar("/home/atguigu/wc.jar");
		job.setJarByClass(WordcountDriver.class);
		
		// 2 指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(WordcountMapper.class);
		job.setReducerClass(WordcountReducer.class);
		
		// 3 指定mapper輸出數據的kv類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 4 指定最終輸出的數據的kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 5 指定job的輸入原始文件所在目錄
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		//輸出目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
// job.submit(); 能夠直接submit,也能夠寫成下面這行,等待完成
		boolean result = job.waitForCompletion(true);
		//退出若是成功返回0,若是失敗返回1
		System.exit(result?0:1);
	}
}

(4)將程序打成jar包,而後拷貝到hadoop集羣中。(上傳到hadoop_home目錄便可)
(5)啓動hadoop集羣
sbin/start-dfs.sh
sbin/start-yarn.sh
(6)執行wordcount程序
[atguigu@hadoop102 software]$ hadoop jar wc.jar com.atguigu.wordcount.WordcountDriver /user/atguigu/input /user/atguigu/output1
com.atguigu.wordcount.WordcountDriver 類的全路徑
/user/atguigu/input輸入路徑
/user/atguigu/output1輸出路徑

若是沒有報任何異常說明成功了
在這裏插入圖片描述
輸入目錄下有三個文件
在這裏插入圖片描述
下載下來,進行查看
在這裏插入圖片描述
這裏顯示的就是上面的三個輸入文件的裏面出現的全部的單詞的個數

3.1.2 需求2:把單詞按照ASCII碼奇偶分區(Partitioner)

0)分析
在這裏插入圖片描述
1)自定義分區
package com.atguigu.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartitioner extends Partitioner<Text, IntWritable>{

@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
	
	// 1 獲取單詞key  
	String firWord = key.toString().substring(0, 1);
	char[] charArray = firWord.toCharArray();
	int result = charArray[0];
	// int result  = key.toString().charAt(0);

	// 2 根據奇數偶數分區
	if (result % 2 == 0) {
		return 0;
	}else {
		return 1;
	}
}

}
2)在驅動中配置加載分區,設置reducetask個數
job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);
3.1.3 需求3:對每個maptask的輸出局部彙總(Combiner)
0)需求:統計過程當中對每個maptask的輸出進行局部彙總,以減少網絡傳輸量即採用Combiner功能。
在這裏插入圖片描述
1)數據準備:
在這裏插入圖片描述
方案一
1)增長一個WordcountCombiner類繼承Reducer
package com.atguigu.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
		Context context) throws IOException, InterruptedException {

	int count = 0;
	for(IntWritable v :values){
		count = v.get();
	}
	
	context.write(key, new IntWritable(count));
}

}
2)在WordcountDriver驅動類中指定combiner
// 9 指定須要使用combiner,以及用哪一個類做爲combiner的邏輯
job.setCombinerClass(WordcountCombiner.class);
方案二
1)將WordcountReducer做爲combiner在WordcountDriver驅動類中指定
// 9 指定須要使用combiner,以及用哪一個類做爲combiner的邏輯
job.setCombinerClass(WordcountReducer.class);

運行程序

在這裏插入圖片描述

3.1.4 需求4:大量小文件的切片優化(CombineTextInputFormat)

0)需求:將輸入的大量小文件合併成一個切片統一處理。
1)輸入數據:準備5個小文件
2)實現過程
(1)不作任何處理,運行需求1中的wordcount程序,觀察切片個數爲5

(2)在WordcountDriver中增長以下代碼,運行程序,並觀察運行的切片個數爲1
// 9 若是不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

(3)注意:若是eclipse打印不出日誌,在控制檯上只顯示
1.log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
2.log4j:WARN Please initialize the log4j system properly.
3.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
須要在項目的src目錄下,新建一個文件,命名爲「log4j.properties」,在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3.2 流量彙總程序案例

3.2.1 需求1:統計手機號耗費的總上行流量、下行流量、總流量(序列化)

1)需求:
統計每個手機號耗費的總上行流量、下行流量、總流量
2)數據準備
在這裏插入圖片描述
在這裏插入圖片描述

在這裏插入圖片描述

3)分析
基本思路:
Map階段:
(1)讀取一行數據,切分字段
(2)抽取手機號、上行流量、下行流量
(3)以手機號爲key,bean對象爲value輸出,即context.write(手機號,bean);
Reduce階段:
(1)累加上行流量和下行流量獲得總流量。
(2)實現自定義的bean來封裝流量信息,並將bean做爲map輸出的key來傳輸
(3)MR程序在處理數據的過程當中會對數據排序(map輸出的kv對傳輸到reduce以前,會排序),排序的依據是map輸出的key
因此,咱們若是要實現本身須要的排序規則,則能夠考慮將排序因素放到key中,讓key實現接口:WritableComparable。
而後重寫key的compareTo方法。
4)編寫mapreduce程序
(1)編寫流量統計的bean對象

package com.atguigu.mr.flowsum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// bean對象要實例化
public class FlowBean implements Writable {

	private long upFlow;
	private long downFlow;
	private long sumFlow;

	// 反序列化時,須要反射調用空參構造函數,因此必須有
	public FlowBean() {
		super();
	}

	public FlowBean(long upFlow, long downFlow) {
		super();
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	/** * 序列化方法 * * @param out * @throws IOException */
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}

	/** * 反序列化方法 注意反序列化的順序和序列化的順序徹底一致 * * @param in * @throws IOException */
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow = in.readLong();
		downFlow = in.readLong();
		sumFlow = in.readLong();
	}

	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}
}

(2)編寫mapreduce主程序

package com.atguigu.mr.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCount {
//輸出的value是FlowBean
	static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			// 1 將一行內容轉成string
			String ling = value.toString();

			// 2 切分字段
			String[] fields = ling.split("\t");//\t是tab鍵

			// 3 取出手機號碼
			String phoneNum = fields[1];

			// 4 取出上行流量和下行流量
			long upFlow = Long.parseLong(fields[fields.length - 3]);
			long downFlow = Long.parseLong(fields[fields.length - 2]);

			// 5 寫出數據
			context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow));
		}
	}

	static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values, Context context)
				throws IOException, InterruptedException {
			long sum_upFlow = 0;
			long sum_downFlow = 0;

			// 1 遍歷所用bean,將其中的上行流量,下行流量分別累加
			for (FlowBean bean : values) {
				sum_upFlow += bean.getUpFlow();
				sum_downFlow += bean.getDownFlow();
			}

			// 2 封裝對象
			FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
			context.write(key, resultBean);
		}
	}

	public static void main(String[] args) throws Exception {
		// 1 獲取配置信息,或者job對象實例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 6 指定本程序的jar包所在的本地路徑
		job.setJarByClass(FlowCount.class);

		// 2 指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		// 3 指定mapper輸出數據的kv類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 4 指定最終輸出的數據的kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 5 指定job的輸入原始文件所在目錄
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

(3)將程序打成jar包,而後拷貝到hadoop集羣中。
(4)啓動hadoop集羣
(5)執行flowcount程序
[atguigu@hadoop102 software]$ hadoop jar flowcount.jar com.atguigu.mr.flowsum.FlowCount /user/atguigu/flowcount/input/ /user/atguigu/flowcount/output
(6)查看結果
[atguigu@hadoop102 software]$ hadoop fs -cat /user/atguigu/flowcount/output/part-r-00000
13480253104 FlowBean [upFlow=180, downFlow=180, sumFlow=360]
13502468823 FlowBean [upFlow=7335, downFlow=110349, sumFlow=117684]
13560436666 FlowBean [upFlow=1116, downFlow=954, sumFlow=2070]
13560439658 FlowBean [upFlow=2034, downFlow=5892, sumFlow=7926]
13602846565 FlowBean [upFlow=1938, downFlow=2910, sumFlow=4848]
。。。

也能夠在eclipse中直接運行
到main類中run as ==> run configuration
在這裏插入圖片描述
輸入路徑和輸出路徑
點擊run
默認console中是沒有輸出的,引入log4j
在這裏插入圖片描述
查看結果
在這裏插入圖片描述
也能夠debug運行,打斷點,而後debug as

經過斷點調試,發現是執行map方法,若是還有下一行,就繼續執行,遍歷完,而後進入reduce的類中,執行reduce,每一行的電話號碼做爲一個key執行reduce,而後將每一個key裏面的值加起來,直到將全部的key遍歷完。

3.2.2 需求2:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(Partitioner)

0)需求:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(分區)
1)數據準備
在這裏插入圖片描述
在這裏插入圖片描述
根據電話號的前三位,137 138 139認爲是不一樣的省份
2)分析
(1)Mapreduce中會將map輸出的kv對,按照相同key分組,而後分發給不一樣的reducetask。默認的分發規則爲:根據key的hashcode%reducetask數來分發
(2)若是要按照咱們本身的需求進行分組,則須要改寫數據分發(分組)組件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner
(3)在job驅動中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
3)在需求1的基礎上,增長一個分區類

package com.atguigu.mr.partitioner;
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/** * K2 V2 對應的是map輸出kv類型 * @author Administrator */
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 獲取電話號碼的前三位
		String preNum = key.toString().substring(0, 3);
		
		int partition = 4;
		
		// 2 判斷是哪一個省
		if ("136".equals(preNum)) {
			partition = 0;
		}else if ("137".equals(preNum)) {
			partition = 1;
		}else if ("138".equals(preNum)) {
			partition = 2;
		}else if ("139".equals(preNum)) {
			partition = 3;
		}
		return partition;
	}
}

2)在驅動函數中增長自定義數據分區設置和reduce task設置

public static void main(String[] args) throws Exception {
		// 1 獲取配置信息,或者job對象實例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 6 指定本程序的jar包所在的本地路徑
		job.setJarByClass(FlowCount.class);

		// 8 指定自定義數據分區
		job.setPartitionerClass(ProvincePartitioner.class);
		
		// 9 同時指定相應數量的reduce task
		job.setNumReduceTasks(5); 
		
		// 2 指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		// 3 指定mapper輸出數據的kv類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 4 指定最終輸出的數據的kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 5 指定job的輸入原始文件所在目錄
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}

上面主要是增長了兩行

// 8 指定自定義數據分區
		job.setPartitionerClass(ProvincePartitioner.class);
		
	// 9 同時指定相應數量的reduce task
		job.setNumReduceTasks(5);

3)將程序打成jar包,而後拷貝到hadoop集羣中。
4)啓動hadoop集羣
5)執行flowcountPartitionser程序

[atguigu@hadoop102 software]$ hadoop jar flowcountPartitionser.jar com.atguigu.mr.partitioner.FlowCount /user/atguigu/flowcount/input /user/atguigu/flowcount/output

6)查看結果

[atguigu@hadoop102 software]$ hadoop fs -lsr /
/user/atguigu/flowcount/output/part-r-00000
/user/atguigu/flowcount/output/part-r-00001
/user/atguigu/flowcount/output/part-r-00002
/user/atguigu/flowcount/output/part-r-00003
/user/atguigu/flowcount/output/part-r-00004

在這裏插入圖片描述
最後一個是各類的都有

3.2.3 需求3:將統計結果按照總流量倒序排序(排序)

0)需求
根據需求1產生的結果再次對總流量進行排序。
1)數據準備
在這裏插入圖片描述

在這裏插入圖片描述

2)分析
(1)把程序分兩步走,第一步正常統計總流量,第二步再把結果進行排序
(2)context.write(總流量,手機號)
(3)FlowBean實現WritableComparable接口重寫compareTo方法

@Override
public int compareTo(FlowBean o) {
	// 倒序排列,從大到小
	return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

3)FlowBean對象在在需求1基礎上增長了比較功能

package com.atguigu.mr.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {

	private long upFlow;
	private long downFlow;
	private long sumFlow;

	// 反序列化時,須要反射調用空參構造函數,因此必須有
	public FlowBean() {
		super();
	}

	public FlowBean(long upFlow, long downFlow) {
		super();
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}

	public void set(long upFlow, long downFlow) {
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	/** * 序列化方法 * @param out * @throws IOException */
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}

	/** * 反序列化方法 注意反序列化的順序和序列化的順序徹底一致 * @param in * @throws IOException */
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow = in.readLong();
		downFlow = in.readLong();
		sumFlow = in.readLong();
	}

	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

	@Override
	public int compareTo(FlowBean o) {
		// 倒序排列,從大到小
		return this.sumFlow > o.getSumFlow() ? -1 : 1;
	}
}

多了一個倒序排序

@Override
	public int compareTo(FlowBean o) {
		// 倒序排列,從大到小
		return this.sumFlow > o.getSumFlow() ? -1 : 1;
	}

4)Map方法優化爲一個對象,reduce方法則直接輸出結果便可,驅動函數根據輸入輸出重寫配置便可。

package com.atguigu.mr.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSort {
//由於後面要對總流量進行排序,因此這裏輸出的第一個參數是bean,第二個參數是手機號
	static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
		FlowBean bean = new FlowBean();
		Text v = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			
			// 1 拿到的是上一個統計程序輸出的結果,已是各手機號的總流量信息
			String line = value.toString();
			
			// 2 截取字符串並獲取電話號、上行流量、下行流量
			String[] fields = line.split("\t");
			String phoneNbr = fields[0];
			
			long upFlow = Long.parseLong(fields[1]);
			long downFlow = Long.parseLong(fields[2]);
			
			// 3 封裝對象
			bean.set(upFlow, downFlow);
			v.set(phoneNbr);
			
			// 4 輸出
			context.write(bean, v);
		}
	}
static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
		
		@Override
		protected void reduce(FlowBean bean, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			context.write(values.iterator().next(), bean);
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 1 獲取配置信息,或者job對象實例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 6 指定本程序的jar包所在的本地路徑
		job.setJarByClass(FlowCountSort.class);

		// 2 指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(FlowCountSortMapper.class);
		job.setReducerClass(FlowCountSortReducer.class);

		// 3 指定mapper輸出數據的kv類型
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);

		// 4 指定最終輸出的數據的kv類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 5 指定job的輸入原始文件所在目錄
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		Path outPath = new Path(args[1]);
// FileSystem fs = FileSystem.get(configuration);
// if (fs.exists(outPath)) {
// fs.delete(outPath, true);
// }
		FileOutputFormat.setOutputPath(job, outPath);

		// 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

5)將程序打成jar包,而後拷貝到hadoop集羣中。
6)啓動hadoop集羣
7)執行flowcountsort程序

[atguigu@hadoop102 software]$ hadoop jar flowcountsort.jar com.atguigu.mr.sort.FlowCountSort /user/atguigu/flowcount/output /user/atguigu/flowcount/output_sort

8)查看結果

[atguigu@hadoop102 software]$ hadoop fs -cat /user/atguigu/flowcount/output_sort/part-r-00000
13502468823	7335	110349	117684
13925057413	11058	48243	59301
13726238888	2481	24681	27162
13726230503	2481	24681	27162
18320173382	9531	2412	11943

輸入文件

在這裏插入圖片描述

輸出

在這裏插入圖片描述

3.3 求每一個訂單中最貴的商品(GroupingComparator)

1)需求
有以下訂單數據
訂單id 商品id 成交金額
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000002 Pdt_05 722.4
Order_0000003 Pdt_01 222.8
Order_0000003 Pdt_02 33.8
如今須要求出每個訂單中最貴的商品。
2)輸入數據
在這裏插入圖片描述
在這裏插入圖片描述

輸出數據預期:
在這裏插入圖片描述
在這裏插入圖片描述
3)分析
(1)利用「訂單id和成交金額」做爲key,能夠將map階段讀取到的全部訂單數據按照id分區,按照金額排序,發送到reduce。
(2)在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,而後取第一個便是最大值。
在這裏插入圖片描述
注意這裏的reduce方法,認爲訂單號同樣的是一個key,若是訂單號同樣仍是認爲是不一樣的key,就會在輸出的結果中所有輸出,而不是輸出第一個最大的,由於每一行都執行了一次reduce方法。
在這裏插入圖片描述
4)實現
定義訂單信息OrderBean

package com.atguigu.mapreduce.order;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean> {

	private String orderId;
	private double price;

	public OrderBean() {
		super();
	}

	public OrderBean(String orderId, double price) {
		super();
		this.orderId = orderId;
		this.price = price;
	}

	public String getOrderId() {
		return orderId;
	}

	public void setOrderId(String orderId) {
		this.orderId = orderId;
	}

	public double getPrice() {
		return price;
	}

	public void setPrice(double price) {
		this.price = price;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.orderId = in.readUTF();
		this.price = in.readDouble();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(orderId);
		out.writeDouble(price);
	}

	@Override
	public int compareTo(OrderBean o) {
		// 1 先按訂單id排序(從小到大)
		//若是但願是倒序的話,-this.xxxxx
		int result = this.orderId.compareTo(o.getOrderId());
		

		if (result == 0) {
			// 2 再按金額排序(從大到小)
			result = price > o.getPrice() ? -1 : 1;
		}

		return result;
	}
	@Override
	//toString決定了最後輸出文件的格式
	public String toString() {
		return orderId + "\t" + price ;
	}
}

編寫OrderParitiner分區流程

//value是null
public class OrderPatitioner extends Partitoiner<OrderBean,NullWritable>{
	@Override
	//按照key的orderid的hashCode值分區
	return(key,getOrderId().hashCode()&Integer.MaxValue)%numPartitions;
}

編寫OrderSortMapper處理流程

package com.atguigu.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
	OrderBean bean = new OrderBean();
	
	@Override
	protected void map(LongWritable key, Text value,
			Context context)throws IOException, InterruptedException {
		// 1 獲取一行數據
		String line = value.toString();
		
		// 2 截取字段
		String[] fields = line.split("\t");
		//Order_0000002 Pdt_03 522.8
		// 3 封裝bean
		bean.setOrderId(fields[0]);
		bean.setPrice(Double.parseDouble(fields[2]));
		
		// 4 寫出
		context.write(bean, NullWritable.get());
	}
}

編寫OrderSortReducer處理流程

package com.atguigu.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
	@Override
	protected void reduce(OrderBean bean, Iterable<NullWritable> values,
			Context context) throws IOException, InterruptedException {
		// 直接寫出
		context.write(bean, NullWritable.get());
	}
}

編寫OrderSortDriver處理流程

package com.atguigu.mapreduce.order;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderSortDriver {

	public static void main(String[] args) throws Exception {
		// 1 獲取配置信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		// 2 設置jar包加載路徑
		job.setJarByClass(OrderSortDriver.class);

		// 3 加載map/reduce類
		job.setMapperClass(OrderSortMapper.class);
		job.setReducerClass(OrderSortReducer.class);

		// 4 設置map輸出數據key和value類型
		job.setMapOutputKeyClass(OrderBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		// 5 設置最終輸出數據的key和value類型
		job.setOutputKeyClass(OrderBean.class);
		job.setOutputValueClass(NullWritable.class);

		// 6 設置輸入數據和輸出數據路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 10 設置reduce端的分組
		job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
		
		// 7 設置分區
		job.setPartitionerClass(OrderSortPartitioner.class);
		
		// 8 設置reduce個數
		job.setNumReduceTasks(3);

		// 9 提交
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

編寫OrderSortPartitioner處理流程

package com.atguigu.mapreduce.order;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class OrderSortPartitioner extends Partitioner<OrderBean, NullWritable>{

	@Override
	public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {
		
		return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
	}
}

編寫OrderSortGroupingComparator處理流程

package com.atguigu.mapreduce.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderSortGroupingComparator extends WritableComparator {

	//寫一個空參構造
	protected OrderSortGroupingComparator() {
		super(OrderBean.class, true);
	}
	
	//重寫比較的方法
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		
		OrderBean abean = (OrderBean) a;
		OrderBean bbean = (OrderBean) b;
		
		// 將orderId相同的bean都視爲一組
		return abean.getOrderId().compareTo(bbean.getOrderId());
	}
}

若是不加分組的話:都顯示出來了
在這裏插入圖片描述
沒有分組,將每一行都認爲是一組,分組以後會將文件中全部的認爲是一組。

在這裏插入圖片描述
設置了分組以後:
在這裏插入圖片描述
由於設置分組以前認爲每一行都是一個分組,每一行執行一次reduce方法,設置分組以後,認爲相同的OrderId的都是一組,只執行一次reduce方法。

代碼分析
在這裏插入圖片描述
在這裏插入圖片描述
在這裏插入圖片描述
在這裏插入圖片描述

3.4 MapReduce中多表合併案例

1)需求:
訂單數據表t_order:
id pid amount
1001 01 1
1002 02 2
1003 03 3
在這裏插入圖片描述
在這裏插入圖片描述
商品信息表t_product
id pname
01 小米
02 華爲
03 格力
在這裏插入圖片描述
將商品信息表中數據根據商品id合併到訂單數據表中。
最終數據形式:
id pname amount
1001 小米 1
1001 小米 1
1002 華爲 2
1002 華爲 2
1003 格力 3
1003 格力 3

將pd表中的name添加到order表中

3.4.1 需求1:reduce端表合併(數據傾斜)

經過將關聯條件做爲map輸出的key,將兩表知足join條件的數據並攜帶數據所來源的文件信息,發往同一個reduce task,在reduce中進行數據的串聯。
在這裏插入圖片描述
1)建立商品和訂合併後的bean類

package com.atguigu.mapreduce.table;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class TableBean implements Writable {
	private String order_id; // 訂單id
	private String p_id; // 產品id
	private int amount; // 產品數量
	private String pname; // 產品名稱
	private String flag;// 表的標記

	public TableBean() {
		super();
	}

	public TableBean(String order_id, String p_id, int amount, String pname, String flag) {
		super();
		this.order_id = order_id;
		this.p_id = p_id;
		this.amount = amount;
		this.pname = pname;
		this.flag = flag;
	}

	public String getFlag() {
		return flag;
	}

	public void setFlag(String flag) {
		this.flag = flag;
	}

	public String getOrder_id() {
		return order_id;
	}

	public void setOrder_id(String order_id) {
		this.order_id = order_id;
	}

	public String getP_id() {
		return p_id;
	}

	public void setP_id(String p_id) {
		this.p_id = p_id;
	}

	public int getAmount() {
		return amount;
	}

	public void setAmount(int amount) {
		this.amount = amount;
	}

	public String getPname() {
		return pname;
	}

	public void setPname(String pname) {
		this.pname = pname;
	}

//Read和Write是爲了序列化和反序列化
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(order_id);
		out.writeUTF(p_id);
		out.writeInt(amount);
		out.writeUTF(pname);
		out.writeUTF(flag);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.order_id = in.readUTF();
		this.p_id = in.readUTF();
		this.amount = in.readInt();
		this.pname = in.readUTF();
		this.flag = in.readUTF();
	}

	@Override
	public String toString() {
		return order_id + "\t" + p_id + "\t" + amount + "\t" ;
	}
}

2)編寫TableMapper程序

package com.atguigu.mapreduce.table;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{
	TableBean bean = new TableBean();
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		// 1 獲取輸入文件類型
		//獲取切片FileSplit其實原來是InputSplit,由於這裏是文件,因此直接用FileSplit
		FileSplit split = (FileSplit) context.getInputSplit();
		//切片裏面存摺文件名稱,這裏獲得的是order.txt或者pd.txt
		String name = split.getPath().getName();
		
		// 2 獲取輸入數據
		String line = value.toString();
		
		// 3 不一樣文件分別處理
		//沒有的值給默認值,不能爲空
		if (name.startsWith("order")) {// 訂單表處理
			// 3.1 切割
			String[] fields = line.split(",");
			
			// 3.2 封裝bean對象
			bean.setOrder_id(fields[0]);
			bean.setP_id(fields[1]);
			bean.setAmount(Integer.parseInt(fields[2]));
			bean.setPname("");
			bean.setFlag("0");
			
			k.set(fields[1]);
		}else {// 產品表處理
			// 3.3 切割
			String[] fields = line.split(",");
			
			// 3.4 封裝bean對象
			bean.setP_id(fields[0]);
			bean.setPname(fields[1]);
			bean.setFlag("1");
			bean.setAmount(0);
			bean.setOrder_id("");
			
			k.set(fields[0]);
		}
		// 4 寫出
		context.write(k, bean);
	}
}

3)編寫TableReducer程序

package com.atguigu.mapreduce.table;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<TableBean> values, Context context)
			throws IOException, InterruptedException {

		// 1準備存儲訂單的集合
		ArrayList<TableBean> orderBeans = new ArrayList<>();
		// 2 準備bean對象
		TableBean pdBean = new TableBean();

		for (TableBean bean : values) {

			if ("0".equals(bean.getFlag())) {// 訂單表
				// 拷貝傳遞過來的每條訂單數據到集合中
				TableBean orderBean = new TableBean();
				try {
					BeanUtils.copyProperties(orderBean, bean);
				} catch (Exception e) {
					e.printStackTrace();
				}

				orderBeans.add(orderBean);
			} else {// 產品表
				try {
					// 拷貝傳遞過來的產品表到內存中
					BeanUtils.copyProperties(pdBean, bean);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}

		// 3 表的拼接
		for(TableBean bean:orderBeans){
			bean.setP_id(pdBean.getPname());
			
			// 4 數據寫出去
			context.write(bean, NullWritable.get());
		}
	}
}

4)編寫TableDriver程序

package com.atguigu.mapreduce.table;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TableDriver {

	public static void main(String[] args) throws Exception {
		// 1 獲取配置信息,或者job對象實例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 指定本程序的jar包所在的本地路徑
		job.setJarByClass(TableDriver.class);

		// 3 指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(TableMapper.class);
		job.setReducerClass(TableReducer.class);

		// 4 指定mapper輸出數據的kv類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TableBean.class);

		// 5 指定最終輸出的數據的kv類型
		job.setOutputKeyClass(TableBean.class);
		job.setOutputValueClass(NullWritable.class);

		// 6 指定job的輸入原始文件所在目錄
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

3)運行程序查看結果

1001	小米	1	
1001	小米	1	
1002	華爲	2	
1002	華爲	2	
1003	格力	3	
1003	格力	3

缺點:這種方式中,合併的操做是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜
解決方案: map端實現數據合併

3.4.2 需求2:map端表合併(Distributedcache)

1)分析
適用於關聯表中有小表的情形;
能夠將小表分發到全部的map節點,這樣,map節點就能夠在本地對本身所讀到的大表數據進行合併並輸出最終結果,能夠大大提升合併操做的併發度,加快處理速度。
在這裏插入圖片描述
2)實操案例
(1)先在驅動模塊中添加緩存文件

package test;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistributedCacheDriver {

	public static void main(String[] args) throws Exception {
		// 1 獲取job信息
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 設置加載jar包路徑
		job.setJarByClass(DistributedCacheDriver.class);

		// 3 關聯map
		job.setMapperClass(DistributedCacheMapper.class);
		
		// 4 設置最終輸出數據類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 5 設置輸入輸出路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 6 加載緩存數據
		job.addCacheFile(new URI("file:/e:/cache/pd.txt"));
		
		// 7 map端join的邏輯不須要reduce階段,設置reducetask數量爲0
		job.setNumReduceTasks(0);

		// 8 提交
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

(2)讀取緩存的文件數據

package test;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

	Map<String, String> pdMap = new HashMap<>();
	
	@Override
	protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		// 1 獲取緩存的文件
		BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt")));
		
		String line;
		while(StringUtils.isNotEmpty(line = reader.readLine())){
			// 2 切割 01 小米
			String[] fields = line.split("\t");
			
			// 3 緩存數據到集合
			pdMap.put(fields[0], fields[1]);
		}
		
		// 4 關流
		reader.close();
	}
	
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
			//由於上面已經緩存了pd.txt,因此這裏只須要讀取order.txt,因此,
			//將文件夾(未來在main裏面須要當作輸入參數的)中只剩下order.txt,
			//這樣map就只讀取order.txt了
		// 1 獲取一行
		String line = value.toString();
		
		// 2 截取 1001 01 4 ,以後拿到01去上面的map中找到對應的值
		String[] fields = line.split("\t");
		
		// 3 獲取訂單id
		String orderId = fields[1];
		
		// 4 獲取商品名稱
		String pdName = pdMap.get(orderId);
		
		// 5 拼接
		k.set(line + "\t"+ pdName);
		
		// 6 寫出
		context.write(k, NullWritable.get());
	}
}

3.5 小文件處理(自定義InputFormat)

1)需求
不管hdfs仍是mapreduce,對於小文件都有損效率,實踐中,又不免面臨處理大量小文件的場景,此時,就須要有相應解決方案。將多個小文件合併成一個文件SequenceFile,SequenceFile裏面存儲着多個文件,存儲的形式爲文件路徑+名稱爲key,文件內容爲value。
2)輸入數據
在這裏插入圖片描述
最終預期文件格式:
在這裏插入圖片描述
在這裏插入圖片描述
在這裏插入圖片描述
這種方式文件中會使亂碼,可是都能讀取
3)分析
小文件的優化無非如下幾種方式:
(1)在數據採集的時候,就將小文件或小批數據合成大文件再上傳HDFS
(2)在業務處理以前,在HDFS上使用mapreduce程序對小文件進行合併
(3)在mapreduce處理時,可採用CombineTextInputFormat提升效率
4)具體實現
本節採用自定義InputFormat的方式,處理輸入小文件的問題。
(1)自定義一個InputFormat
(2)改寫RecordReader,實現一次讀取一個完整文件封裝爲KV
(3)在輸出時使用SequenceFileOutPutFormat輸出合併文件
5)程序實現:
(1)自定義InputFromat

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{

	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}
	
	@Override
	public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// 1 定義一個本身的recordReader
		WholeRecordReader recordReader = new WholeRecordReader();
		
		// 2 初始化recordReader,調用初始化方法 
		recordReader.initialize(split, context);
		
		return recordReader;
	}
}

(2)自定義RecordReader

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> {
	private FileSplit split;
	private Configuration configuration;

	private BytesWritable value = new BytesWritable();
	private boolean processed = false;

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		// 獲取傳遞過來的數據
		//獲取切片信息
		this.split = (FileSplit) split;
		//獲取配置信息
		configuration = context.getConfiguration();
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		
		if (!processed) {
			// 1 定義緩存
			byte[] contents = new byte[(int) split.getLength()];

			// 2 獲取文件系統
			//獲取切片路徑
			Path path = split.getPath();
			FileSystem fs = path.getFileSystem(configuration);

			// 3 讀取內容
			FSDataInputStream fis = null;
			try {
				// 3.1 打開輸入流
				//獲取切片輸入流
				fis = fs.open(path);
				
				// 3.2 讀取文件內容
				//就是一次性把文件讀完
				IOUtils.readFully(fis, contents, 0, contents.length);
				
				// 3.3 輸出文件內容
				//設置輸出
				value.set(contents, 0, contents.length);
			} catch (Exception e) {

			} finally {
				IOUtils.closeStream(fis);
			}
			
			processed = true;
			
			return true;
		}
		
		return false;
	}

	@Override
	public NullWritable getCurrentKey() throws IOException, InterruptedException {
		
		return NullWritable.get();
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
		
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return processed?1:0;
	}

	@Override
	public void close() throws IOException {

	}
	//是否能夠切割,這裏的是否能夠切割的意思是單個文件是否切割,而不是上面的三個文件是否能夠切割
	//也就是好比若是大於200m,就將四三個文件切割,這裏的文件就變成了四個。
	@Override
	protected boolean isSplitable(JoContext contexr,Path filename){
		return false;
	}
}

(3)InputFormatDriver處理流程

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class InputFormatDriver {

	static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
		private Text filenameKey;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			// 獲取切片信息
			InputSplit split = context.getInputSplit();
			// 獲取切片路徑
			Path path = ((FileSplit) split).getPath();
			// 根據切片路徑獲取文件名稱
			filenameKey = new Text(path.toString());
		}

		@Override
		protected void map(NullWritable key, BytesWritable value, Context context)
				throws IOException, InterruptedException {
			// 文件名稱爲key
			context.write(filenameKey, value);
		}
	}

	public static void main(String[] args) throws Exception {
		args = new String[] { "e:/input", "e:/output11" };

		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		job.setJarByClass(InputFormatDriver.class);

		job.setInputFormatClass(WholeFileInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);

		job.setMapperClass(SequenceFileMapper.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean result = job.waitForCompletion(true);

		System.exit(result ? 0 : 1);
	}
}

3.6 過濾日誌及自定義日誌輸出路徑(自定義OutputFormat)

1)需求
過濾輸入的log日誌中是否包含atguigu
(1)包含atguigu的網站輸出到e:/atguigu.log
(2)不包含atguigu的網站輸出到e:/other.log
2)輸入數據
在這裏插入圖片描述
在這裏插入圖片描述
輸出預期:
在這裏插入圖片描述
3)具體程序:
(1)自定義一個outputformat

package com.atguigu.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

	@Override
	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {

		// 建立一個RecordWriter
		return new FilterRecordWriter(job);
	}
}

(2)具體的寫數據RecordWriter

package com.atguigu.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
	FSDataOutputStream atguiguOut = null;
	FSDataOutputStream otherOut = null;

	public FilterRecordWriter(TaskAttemptContext job) {
		// 1 獲取文件系統
		FileSystem fs;

		try {
			fs = FileSystem.get(job.getConfiguration());

			// 2 建立輸出文件路徑
			Path atguiguPath = new Path("e:/atguigu.log");
			Path otherPath = new Path("e:/other.log");

			// 3 建立輸出流
			atguiguOut = fs.create(atguiguPath);
			otherOut = fs.create(otherPath);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void write(Text key, NullWritable value) throws IOException, InterruptedException {

		// 判斷是否包含「atguigu」輸出到不一樣文件
		if (key.toString().contains("atguigu")) {
			atguiguOut.write(key.toString().getBytes());
		} else {
			otherOut.write(key.toString().getBytes());
		}
	}

	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		// 關閉資源
		if (atguiguOut != null) {
			atguiguOut.close();
		}
		
		if (otherOut != null) {
			otherOut.close();
		}
	}
}

(3)編寫FilterMapper

package com.atguigu.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// 1 獲取一行
		String line = value.toString();
		
		k.set(line);
		
		// 3 寫出
		context.write(k, NullWritable.get());
	}
}

(4)編寫FilterReducer
在這裏插入圖片描述
默認是連在一塊兒的,因此處理一下

package com.atguigu.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {
//在key上加上回車和換行符。
		String k = key.toString();
		k = k + "\r\n";

		context.write(new Text(k), NullWritable.get());
	}
}

(5)編寫FilterDriver

package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);

		job.setJarByClass(FilterDriver.class);
		job.setMapperClass(FilterMapper.class);
		job.setReducerClass(FilterReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 要將自定義的輸出格式組件設置到job中
		job.setOutputFormatClass(FilterOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// 雖然咱們自定義了outputformat,可是由於咱們的outputformat繼承自fileoutputformat
		// 而fileoutputformat要輸出一個_SUCCESS文件,因此,在這還得指定一個輸出目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

在這裏插入圖片描述

3.7 日誌清洗(數據清洗)

3.7.1 簡單解析版

1)需求:
去除日誌中字段長度小於等於11的日誌。
2)輸入數據
在這裏插入圖片描述
在這裏插入圖片描述
在這裏插入圖片描述
3)實現代碼:
(1)編寫LogMapper

package com.atguigu.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		// 1 獲取1行數據
		String line = value.toString();
		
		// 2 解析日誌
		
		boolean result = parseLog(line,context);
		
		// 3 日誌不合法退出
		if (!result) {
			return;
		}
		
		// 4 設置key
		k.set(line);
		
		// 5 寫出數據
		context.write(k, NullWritable.get());
	}

	// 2 解析日誌
	private boolean parseLog(String line, Context context) {
		// 1 截取
		String[] fields = line.split(" ");
		
		// 2 日誌長度大於11的爲合法
		if (fields.length > 11) {
			// 系統計數器
			//計數器相關查看2.8 
			context.getCounter("map", "true").increment(1);
			return true;
		}else {
			context.getCounter("map", "false").increment(1);
			return false;
		}
	}
}

(2)編寫LogDriver

package com.atguigu.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {

	public static void main(String[] args) throws Exception {
		// 1 獲取job信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		// 2 加載jar包
		job.setJarByClass(LogDriver.class);

		// 3 關聯map
		job.setMapperClass(LogMapper.class);

		// 4 設置最終輸出類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 5 設置輸入和輸出路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 6 提交
		job.waitForCompletion(true);
	}
}

在這裏插入圖片描述
在這裏插入圖片描述
能夠看到日誌的行數明顯變少了。
在這裏插入圖片描述
能夠看到上面第一的組名爲map的計數器中的true和false的累加。
注意上面說的長度小於十一的日誌,這裏的長度指的是line.split(" ")的長度。

3.7.2 複雜解析版

1)需求:
對web訪問日誌中的各字段識別切分
去除日誌中不合法的記錄
根據統計需求,生成各種訪問請求過濾數據
2)輸入數據
在這裏插入圖片描述
3)實現代碼:
(1)定義一個bean,用來記錄日誌數據中的各數據字段
package com.atguigu.mapreduce.log;

public class LogBean {
private String remote_addr;// 記錄客戶端的ip地址
private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"
private String time_local;// 記錄訪問時間與時區
private String request;// 記錄請求的url與http協議
private String status;// 記錄請求狀態;成功是200
private String body_bytes_sent;// 記錄發送給客戶端文件主體內容大小
private String http_referer;// 用來記錄從那個頁面連接訪問過來的
private String http_user_agent;// 記錄客戶瀏覽器的相關信息

private boolean valid = true;// 判斷數據是否合法

public String getRemote_addr() {
	return remote_addr;
}

public void setRemote_addr(String remote_addr) {
	this.remote_addr = remote_addr;
}

public String getRemote_user() {
	return remote_user;
}

public void setRemote_user(String remote_user) {
	this.remote_user = remote_user;
}

public String getTime_local() {
	return time_local;
}

public void setTime_local(String time_local) {
	this.time_local = time_local;
}

public String getRequest() {
	return request;
}

public void setRequest(String request) {
	this.request = request;
}

public String getStatus() {
	return status;
}

public void setStatus(String status) {
	this.status = status;
}

public String getBody_bytes_sent() {
	return body_bytes_sent;
}

public void setBody_bytes_sent(String body_bytes_sent) {
	this.body_bytes_sent = body_bytes_sent;
}

public String getHttp_referer() {
	return http_referer;
}

public void setHttp_referer(String http_referer) {
	this.http_referer = http_referer;
}

public String getHttp_user_agent() {
	return http_user_agent;
}

public void setHttp_user_agent(String http_user_agent) {
	this.http_user_agent = http_user_agent;
}

public boolean isValid() {
	return valid;
}

public void setValid(boolean valid) {
	this.valid = valid;
}

@Override
public String toString() {
	StringBuilder sb = new StringBuilder();
	sb.append(this.valid);
	sb.append("\001").append(this.remote_addr);
	sb.append("\001").append(this.remote_user);
	sb.append("\001").append(this.time_local);
	sb.append("\001").append(this.request);
	sb.append("\001").append(this.status);
	sb.append("\001").append(this.body_bytes_sent);
	sb.append("\001").append(this.http_referer);
	sb.append("\001").append(this.http_user_agent);
	
	return sb.toString();
}

}
(2)編寫LogMapper程序
package com.atguigu.mapreduce.log;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text k = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException {
	// 1 獲取1行
	String line = value.toString();
	
	// 2 解析日誌是否合法
	LogBean bean = pressLog(line);
	
	if (!bean.isValid()) {
		return;
	}
	
	k.set(bean.toString());
	
	// 3 輸出
	context.write(k, NullWritable.get());
}

// 解析日誌
private LogBean pressLog(String line) {
	LogBean logBean = new LogBean();
	
	// 1 截取
	String[] fields = line.split(" ");
	
	if (fields.length > 11) {
		// 2封裝數據
		logBean.setRemote_addr(fields[0]);
		logBean.setRemote_user(fields[1]);
		logBean.setTime_local(fields[3].substring(1));
		logBean.setRequest(fields[6]);
		logBean.setStatus(fields[8]);
		logBean.setBody_bytes_sent(fields[9]);
		logBean.setHttp_referer(fields[10]);
		
		if (fields.length > 12) {
			logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
		}else {
			logBean.setHttp_user_agent(fields[11]);
		}
		
		// 大於400,HTTP錯誤
		if (Integer.parseInt(logBean.getStatus()) >= 400) {
			logBean.setValid(false);
		}
	}else {
		logBean.setValid(false);
	}
	
	return logBean;
}

}
(3)編寫LogDriver程序
package com.atguigu.mapreduce.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {
public static void main(String[] args) throws Exception {
// 1 獲取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 加載jar包
	job.setJarByClass(LogDriver.class);

	// 3 關聯map
	job.setMapperClass(LogMapper.class);

	// 4 設置最終輸出類型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(NullWritable.class);

	// 5 設置輸入和輸出路徑
	FileInputFormat.setInputPaths(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));

	// 6 提交
	job.waitForCompletion(true);
}

}

3.8 倒排索引實戰(多job串聯)

0)需求:有大量的文本(文檔、網頁),須要創建搜索索引

在這裏插入圖片描述
在這裏插入圖片描述
上面的意思就是atguigu在a.txt中存了三次,b.txt中存了兩次。
(1)第一次預期輸出結果

atguigu--a.txt	3
atguigu--b.txt	2
atguigu--c.txt	2
pingping--a.txt	 1
pingping--b.txt	3
pingping--c.txt	 1
ss--a.txt	2
ss--b.txt	1
ss--c.txt	1

(2)第二次預期輸出結果

atguigu	c.txt-->2	b.txt-->2	a.txt-->3	
pingping	c.txt-->1	b.txt-->3	a.txt-->1	
ss	c.txt-->1	b.txt-->1	a.txt-->2

在這裏插入圖片描述

1)第一次處理
(1)第一次處理,編寫OneIndexMapper

package com.atguigu.mapreduce.index;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

	Text k = new Text();

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 1 獲取切片名稱
		FileSplit inputSplit = (FileSplit) context.getInputSplit();
		String name = inputSplit.getPath().getName();

		// 2 獲取1行
		String line = value.toString();

		// 3 截取
		String[] words = line.split(" ");

		// 4 把每一個單詞和切片名稱關聯起來
		for (String word : words) {
			k.set(word + "--" + name);
			
			context.write(k, new IntWritable(1));
		}
	}
}

(2)第一次處理,編寫OneIndexReducer

package com.atguigu.mapreduce.index;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		
		int count = 0;
		// 累加和
		for(IntWritable value: values){
			count +=value.get();
		}
		
		// 寫出
		context.write(key, new IntWritable(count));
	}
}

(3)第一次處理,編寫OneIndexDriver

package com.atguigu.mapreduce.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OneIndexDriver {

	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);
		job.setJarByClass(OneIndexDriver.class);

		job.setMapperClass(OneIndexMapper.class);
		job.setReducerClass(OneIndexReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}
}

(4)查看第一次輸出結果

atguigu--a.txt	3
atguigu--b.txt	2
atguigu--c.txt	2
pingping--a.txt	1
pingping--b.txt	3
pingping--c.txt	1
ss--a.txt	2
ss--b.txt	1
ss--c.txt	1

2)第二次處理
(1)第二次處理,編寫TwoIndexMapper

package com.atguigu.mapreduce.index;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text>{
	Text k = new Text();
	Text v = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		// 1 獲取1行數據
		String line = value.toString();
		
		// 2用「--」切割
		String[] fields = line.split("--");
		
		k.set(fields[0]);
		v.set(fields[1]);
		
		// 3 輸出數據
		context.write(k, v);
	}
}

(2)第二次處理,編寫TwoIndexReducer

package com.atguigu.mapreduce.index;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TwoIndexReducer extends Reducer<Text, Text, Text, Text> {

	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		// atguigu a.txt 3
		// atguigu b.txt 2
		// atguigu c.txt 2

		// atguigu c.txt-->2 b.txt-->2 a.txt-->3

		StringBuilder sb = new StringBuilder();

		for (Text value : values) {
			sb.append(value.toString().replace("\t", "-->") + "\t");
		}
		
		context.write(key, new Text(sb.toString()));
	}
}

(3)第二次處理,編寫TwoIndexDriver

package com.atguigu.mapreduce.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TwoIndexDriver {

	public static void main(String[] args) throws Exception {
		Configuration config = new Configuration();
		Job job = Job.getInstance(config);

		job.setMapperClass(TwoIndexMapper.class);
		job.setReducerClass(TwoIndexReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

(4)第二次查看最終結果

atguigu	c.txt-->2	b.txt-->2	a.txt-->3	
pingping	c.txt-->1	b.txt-->3	a.txt-->1	
ss	c.txt-->1	b.txt-->1	a.txt-->2

3.9 找微信共同好友分析實戰

1)需求:
如下是微信的好友列表數據,冒號前是一個用,冒號後是該用戶的全部好友(數據中的好友關係是單向的)
在這裏插入圖片描述
求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?
2)需求分析:
先求出A、B、C、….等是誰的好友
第一次輸出結果
A I,K,C,B,G,F,H,O,D,
B A,F,J,E,
C A,E,B,H,F,G,K,
D G,C,K,A,L,F,E,H,
E G,M,L,H,A,F,B,D,
F L,M,D,C,G,A,
G M,
H O,
I O,C,
J O,
K B,
L D,E,
M E,F,
O A,H,I,J,F,
第二次輸出結果
A-B E C
A-C D F
A-D E F
A-E D B C
A-F O B C D E
A-G F E C D
A-H E C D O
A-I O
A-J O B
A-K D C
A-L F E D
A-M E F
B-C A
B-D A E
B-E C
B-F E A C
B-G C E A
B-H A E C
B-I A
B-K C A
B-L E
B-M E
B-O A
C-D A F
C-E D
C-F D A
C-G D F A
C-H D A
C-I A
C-K A D
C-L D F
C-M F
C-O I A
D-E L
D-F A E
D-G E A F
D-H A E
D-I A
D-K A
D-L E F
D-M F E
D-O A
E-F D M C B
E-G C D
E-H C D
E-J B
E-K C D
E-L D
F-G D C A E
F-H A D O E C
F-I O A
F-J B O
F-K D C A
F-L E D
F-M E
F-O A
G-H D C E A
G-I A
G-K D A C
G-L D F E
G-M E F
G-O A
H-I O A
H-J O
H-K A C D
H-L D E
H-M E
H-O A
I-J O
I-K A
I-O A
K-L D
K-O A
L-M E F
3)代碼實現:
(1)第一次Mapper
package com.atguigu.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
		throws IOException, InterruptedException {
	// 1 獲取一行 A:B,C,D,F,E,O
	String line = value.toString();
	
	// 2 切割
	String[] fileds = line.split(":");
	
	// 3 獲取person和好友
	String person = fileds[0];
	String[] friends = fileds[1].split(",");
	
	// 4寫出去
	for(String friend: friends){
		// 輸出 <好友,人>
		context.write(new Text(friend), new Text(person));
	}
}

}
(2)第一次Reducer
package com.atguigu.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException {
	
	StringBuffer sb = new StringBuffer();
	//1 拼接
	for(Text person: values){
		sb.append(person).append(",");
	}
	
	//2 寫出
	context.write(key, new Text(sb.toString()));
}

}
(3)第一次Driver
package com.atguigu.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OneShareFriendsDriver {

public static void main(String[] args) throws Exception {
	// 1 獲取job對象
	Configuration configuration = new Configuration();
	Job job = Job.getInstance(configuration);
	
	// 2 指定jar包運行的路徑
	job.setJarByClass(OneShareFriendsDriver.class);

	// 3 指定map/reduce使用的類
	job.setMapperClass(OneShareFriendsMapper.class);
	job.setReducerClass(OneShareFriendsReducer.class);
	
	// 4 指定map輸出的數據類型
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);
	
	// 5 指定最終輸出的數據類型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);
	
	// 6 指定job的輸入原始所在目錄
	FileInputFormat.setInputPaths(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));
	
	// 7 提交
	boolean result = job.waitForCompletion(true);
	
	System.exit(result?1:0);
}

}
(4)第二次Mapper
package com.atguigu.mapreduce.friends;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

@Override
protected void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException {
	// A I,K,C,B,G,F,H,O,D,
	// 友 人,人,人
	String line = value.toString();
	String[] friend_persons = line.split("\t");

	String friend = friend_persons[0];
	String[] persons = friend_persons[1].split(",");

	Arrays.sort(persons);

	for (int i = 0; i < persons.length - 1; i++) {
		
		for (int j = i + 1; j < persons.length; j++) {
			// 發出 <人-人,好友> ,這樣,相同的「人-人」對的全部好友就會到同1個reduce中去
			context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));
		}
	}
}

}
(5)第二次Reducer
package com.atguigu.mapreduce.friends;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException {
	
	StringBuffer sb = new StringBuffer();

	for (Text friend : values) {
		sb.append(friend).append(" ");
	}
	
	context.write(key, new Text(sb.toString()));
}

}
(6)第二次Driver
package com.atguigu.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TwoShareFriendsDriver {

public static void main(String[] args) throws Exception {
	// 1 獲取job對象
	Configuration configuration = new Configuration();
	Job job = Job.getInstance(configuration);
	
	// 2 指定jar包運行的路徑
	job.setJarByClass(TwoShareFriendsDriver.class);

	// 3 指定map/reduce使用的類
	job.setMapperClass(TwoShareFriendsMapper.class);
	job.setReducerClass(TwoShareFriendsReducer.class);
	
	// 4 指定map輸出的數據類型
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);
	
	// 5 指定最終輸出的數據類型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);
	
	// 6 指定job的輸入原始所在目錄
	FileInputFormat.setInputPaths(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));
	
	// 7 提交
	boolean result = job.waitForCompletion(true);
	
	System.exit(result?1:0);
}

}

3.10 壓縮/解壓縮

3.10.1 對數據流的壓縮和解壓縮

CompressionCodec有兩個方法能夠用於輕鬆地壓縮或解壓縮數據。要想對正在被寫入一個輸出流的數據進行壓縮,咱們可使用 createOutputStream(OutputStreamout)方法建立一個CompressionOutputStream(未壓縮的數據將 被寫到此),將其以壓縮格式寫入底層的流。相反,要想對從輸入流讀取而來的數據進行解壓縮,則調用 createInputStream(InputStreamin)函數,從而得到一個CompressionInputStream,,從而從底層的流 讀取未壓縮的數據。
測試一下以下壓縮方式:
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZ4 org.apache.hadoop.io.compress.Lz4Codec

package com.atguigu.mapreduce.compress;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class TestCompress {

public static void main(String[] args) throws Exception, IOException {

// compress(「e:/test.txt」,「org.apache.hadoop.io.compress.BZip2Codec」);
decompres(「e:/test.txt.bz2」);
}

/*
 * 壓縮
 * filername:要壓縮文件的路徑
 * method:欲使用的壓縮的方法(org.apache.hadoop.io.compress.BZip2Codec)
 */
public static void compress(String filername, String method) throws ClassNotFoundException, IOException {
	
	// 1 建立壓縮文件路徑的輸入流
	File fileIn = new File(filername);
	InputStream in = new FileInputStream(fileIn);
	
	// 2 獲取壓縮的方式的類
	Class codecClass = Class.forName(method);
	
	Configuration conf = new Configuration();
	// 3 經過名稱找到對應的編碼/解碼器
	CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);

	// 4 該壓縮方法對應的文件擴展名
	File fileOut = new File(filername + codec.getDefaultExtension());
	fileOut.delete();

	OutputStream out = new FileOutputStream(fileOut);
	CompressionOutputStream cout = codec.createOutputStream(out);

	// 5 流對接
	IOUtils.copyBytes(in, cout, 1024 * 1024 * 5, false); // 緩衝區設爲5MB

	// 6 關閉資源
	in.close();
	cout.close();
}

/*
 * 解壓縮
 * filename:但願解壓的文件路徑
 */
public static void decompres(String filename) throws FileNotFoundException, IOException {

	Configuration conf = new Configuration();
	CompressionCodecFactory factory = new CompressionCodecFactory(conf);
	
	// 1 獲取文件的壓縮方法
	CompressionCodec codec = factory.getCodec(new Path(filename));
	
	// 2 判斷該壓縮方法是否存在
	if (null == codec) {
		System.out.println("Cannot find codec for file " + filename);
		return;
	}

	// 3 建立壓縮文件的輸入流
	InputStream cin = codec.createInputStream(new FileInputStream(filename));
	
	// 4 建立解壓縮文件的輸出流
	File fout = new File(filename + ".decoded");
	OutputStream out = new FileOutputStream(fout);

	// 5 流對接
	IOUtils.copyBytes(cin, out, 1024 * 1024 * 5, false);

	// 6 關閉資源
	cin.close();
	out.close();
}

}

3.10.2 在Map輸出端採用壓縮

即便你的MapReduce的輸入輸出文件都是未壓縮的文件,你仍然能夠對map任務的中間結果輸出作壓縮,由於它要寫在硬盤而且經過網絡傳輸到reduce節點,對其壓縮能夠提升不少性能,這些工做只要設置兩個屬性便可,咱們來看下代碼怎麼設置:
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

	Configuration configuration = new Configuration();

	// 開啓map端輸出壓縮
	configuration.setBoolean("mapreduce.map.output.compress", true);
	// 設置map端輸出壓縮方式
	configuration.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);

	Job job = Job.getInstance(configuration);

	job.setJarByClass(WordCountDriver.class);

	job.setMapperClass(WordCountMapper.class);
	job.setReducerClass(WordCountReducer.class);

	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(IntWritable.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);

	FileInputFormat.setInputPaths(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));

	boolean result = job.waitForCompletion(true);

	System.exit(result ? 1 : 0);
}

}
2)Mapper保持不變
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

@Override
protected void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException {
	
	String line = value.toString();
	
	String[] words = line.split(" ");
	
	for(String word:words){
		context.write(new Text(word), new IntWritable(1));
	}
}

}
3)Reducer保持不變
package com.atguigu.mapreduce.compress;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
		Context context) throws IOException, InterruptedException {
	
	int count = 0;
	
	for(IntWritable value:values){
		count += value.get();
	}
	
	context.write(key, new IntWritable(count));
}

}

3.10.3 在Reduce輸出端採用壓縮

基於workcount案例處理
1)修改驅動
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	
	Configuration configuration = new Configuration();
	
	Job job = Job.getInstance(configuration);
	
	job.setJarByClass(WordCountDriver.class);
	
	job.setMapperClass(WordCountMapper.class);
	job.setReducerClass(WordCountReducer.class);
	
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(IntWritable.class);
	
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	
	FileInputFormat.setInputPaths(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));
	
	// 設置reduce端輸出壓縮開啓
	FileOutputFormat.setCompressOutput(job, true);
	
	// 設置壓縮的方式
    FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);

// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

boolean result = job.waitForCompletion(true);
	
	System.exit(result?1:0);
}

}
2)Mapper和Reducer保持不變(詳見3.10.2)

4、常見錯誤

1)導包容易出錯。尤爲Text. 2)Mapper中第一個輸入的參數必須是LongWritable或者NullWritable,不能夠是IntWritable. 報的錯誤是類型轉換異常。 3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656 (4),說明partition和reducetask個數沒對上,調整reducetask個數。 4)若是分區數不是1,可是reducetask爲1,是否執行分區過程。答案是:不執行分區過程。由於在maptask的源碼中,執行分區的前提是先判斷reduceNum個數是否大於1。不大於1確定不執行。

相關文章
相關標籤/搜索