Mapreduce是一個分佈式運算程序的編程框架,是用戶開發「基於hadoop的數據分析應用」的核心框架;前端
Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分佈式運算程序,併發運行在一個hadoop集羣上。java
1)海量數據在單機上處理由於硬件資源限制,沒法勝任node
2)而一旦將單機版程序擴展到集羣來分佈式運行,將極大增長程序的複雜度和開發難度算法
3)引入mapreduce框架後,開發人員能夠將絕大部分工做集中在業務邏輯的開發上,而將分佈式計算中的複雜性交由框架來處理。數據庫
1)分佈式的運算程序每每須要分紅至少2個階段apache
2)第一個階段的maptask併發實例,徹底並行運行,互不相干編程
3)第二個階段的reduce task併發實例互不相干,可是他們的數據依賴於上一個階段的全部maptask併發實例的輸出windows
4)MapReduce編程模型只能包含一個map階段和一個reduce階段,若是用戶的業務邏輯很是複雜,那就只能多個mapreduce程序,串行運行緩存
一個完整的mapreduce程序在分佈式運行時有三類實例進程:安全
1)MrAppMaster:負責整個程序的過程調度及狀態協調
2)MapTask:負責map階段的整個數據處理流程
3)ReduceTask:負責reduce階段的整個數據處理流程
用戶編寫的程序分紅三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
1)Mapper階段
(1)用戶自定義的Mapper要繼承本身的父類
(2)Mapper的輸入數據是KV對的形式(KV的類型可自定義)
(3)Mapper中的業務邏輯寫在map()方法中
(4)Mapper的輸出數據是KV對的形式(KV的類型可自定義)
(5)map()方法(maptask進程)對每個<K,V>調用一次
2)Reducer階段
(1)用戶自定義的Reducer要繼承本身的父類
(2)Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
(3)Reducer的業務邏輯寫在reduce()方法中
(4)Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
3)Driver階段
整個程序須要一個Drvier來進行提交,提交的是一個描述了各類必要信息的job對象
4)案例實操
詳見3.1.1統計一堆文件中單詞出現的個數(WordCount案例)
1)在MapReduce程序讀取文件的輸入目錄上存放相應的文件。
2)客戶端程序在submit()方法執行前,獲取待處理的數據信息,而後根據集羣中參數的配置造成一個任務分配規劃。
3)客戶端提交job.split、jar包、job.xml等文件給yarn,yarn中的resourcemanager啓動MRAppMaster。
4)MRAppMaster啓動後根據本次job的描述信息,計算出須要的maptask實例數量,而後向集羣申請機器啓動相應數量的maptask進程。
5)maptask利用客戶指定的inputformat來讀取數據,造成輸入KV對。
6)maptask將輸入KV對傳遞給客戶定義的map()方法,作邏輯運算
7)map()運算完畢後將KV對收集到maptask緩存。
8)maptask緩存中的KV對按照K分區排序後不斷寫到磁盤文件
9)MRAppMaster監控到全部maptask進程任務完成以後,會根據客戶指定的參數啓動相應數量的reducetask進程,並告知reducetask進程要處理的數據分區。
10)Reducetask進程啓動以後,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,並在本地進行從新歸併排序,而後按照相同key的KV爲一個組,調用客戶定義的reduce()方法進行邏輯運算。
11)Reducetask運算完畢後,調用客戶指定的outputformat將結果數據輸出到外部存儲。
序列化就是把內存中的對象,轉換成字節序列(或其餘數據傳輸協議)以便於存儲(持久化)和網絡傳輸。
反序列化就是將收到字節序列(或其餘數據傳輸協議)或者是硬盤的持久化數據,轉換成內存中的對象。
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化後,會附帶不少額外的信息(各類校驗信息,header,繼承體系等),不便於在網絡中高效傳輸。因此,hadoop本身開發了一套序列化機制(Writable),精簡、高效。
經常使用的數據類型對應的hadoop數據序列化類型
Java類型 |
Hadoop Writable類型 |
boolean |
BooleanWritable |
byte |
ByteWritable |
int |
IntWritable |
float |
FloatWritable |
long |
LongWritable |
double |
DoubleWritable |
string |
Text |
map |
MapWritable |
array |
ArrayWritable |
1)自定義bean對象要想序列化傳輸,必須實現序列化接口,須要注意如下7項。
(1)必須實現Writable接口
(2)反序列化時,須要反射調用空參構造函數,因此必須有空參構造
(3)重寫序列化方法
(4)重寫反序列化方法
(5)注意反序列化的順序和序列化的順序徹底一致
(6)要想把結果顯示在文件中,須要重寫toString(),且用」\t」分開,方便後續用
(7)若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序
// 1 必須實現Writable接口 public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; //2 反序列化時,須要反射調用空參構造函數,因此必須有 public FlowBean() { super(); } /** * 3重寫序列化方法 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 4 重寫反序列化方法 5 注意反序列化的順序和序列化的順序徹底一致 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } // 6要想把結果顯示在文件中,須要重寫toString(),且用」\t」分開,方便後續用 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } //7 若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序 @Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } }
2)案例
詳見3.2.1統計每個手機號耗費的總上行流量、下行流量、總流量(序列化)。
1)job提交流程源碼詳解
waitForCompletion() submit(); // 1創建鏈接 connect(); // 1)建立提交job的代理 new Cluster(getConfiguration()); // (1)判斷是本地yarn仍是遠程 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)建立給集羣提交數據的Stag路徑 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)獲取jobid ,並建立job路徑 JobID jobId = submitClient.getNewJobID(); // 3)拷貝jar包到集羣 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)計算切片,生成切片規劃文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路徑寫xml配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交job,返回提交狀態 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2)FileInputFormat源碼解析(input.getSplits(job))
(1)找到你數據存儲的目錄。
(2)開始遍歷處理(規劃切片)目錄下的每個文件
(3)遍歷第一個文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計算切片大小computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128M
c)默認狀況下,切片大小=blocksize
d)開始切,造成第1個切片:ss.txt—0:128M 第2個切片ss.txt—128:256M 第3個切片ss.txt—256M:300M(每次切片時,都要判斷切完剩下的部分是否大於塊的1.1倍,不大於1.1倍就劃分一塊切片)
e)將切片信息寫到一個切片規劃文件中
f)整個切片的核心過程在getSplit()方法中完成。
g)數據切片只是在邏輯上對輸入數據進行分片,並不會再磁盤上將其切分紅分片進行存儲。InputSplit只記錄了分片的元數據信息,好比起始位置、長度以及所在的節點列表等。
h)注意:block是HDFS上物理上存儲的存儲的數據,切片是對數據邏輯上的劃分。
(4)提交切片規劃文件到yarn上,yarn上的MrAppMaster就能夠根據切片規劃文件計算開啓maptask個數。
3)FileInputFormat中默認的切片機制:
(1)簡單地按照文件的內容長度進行切片
(2)切片大小,默認等於block大小
(3)切片時不考慮數據集總體,而是逐個針對每個文件單獨切片
好比待處理數據有兩個文件:
file1.txt 320M
file2.txt 10M
通過FileInputFormat的切片機制運算後,造成的切片信息以下:
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M
4)FileInputFormat切片大小的參數配置
(1)經過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1 默認值爲1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue
所以,默認狀況下,切片大小=blocksize。
maxsize(切片最大值):參數若是調得比blocksize小,則會讓切片變小,並且就等於配置的這個參數的值。
minsize (切片最小值):參數調的比blockSize大,則可讓切片變得比blocksize還大。
5)獲取切片信息API
// 根據文件類型獲取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 獲取切片的文件名稱 String name = inputSplit.getPath().getName();
關於大量小文件的優化策略
1)默認狀況下TextInputformat對任務的切片機制是按文件規劃切片,無論文件多小,都會是一個單獨的切片,都會交給一個maptask,這樣若是有大量小文件,就會產生大量的maptask,處理效率極其低下。
2)優化策略
(1)最好的辦法,在數據處理系統的最前端(預處理/採集),將小文件先合併成大文件,再上傳到HDFS作後續分析。
(2)補救措施:若是已是大量小文件在HDFS中了,可使用另外一種InputFormat來作切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不一樣:它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個maptask。
(3)優先知足最小切片大小,不超過最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
3)具體實現步驟
// 9 若是不設置InputFormat,它默認用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
4)案例
詳見3.1.4 需求4:大量小文件的切片優化(CombineTextInputFormat)。
1)概述
(1)自定義一個InputFormat
(2)改寫RecordReader,實現一次讀取一個完整文件封裝爲KV
(3)在輸出時使用SequenceFileOutPutFormat輸出合併文件
2)案例
詳見3.5小文件處理(自定義InputFormat)。
1)問題引出
maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,mapTask並行任務是否越多越好呢?
2)MapTask並行度決定機制
一個job的map階段MapTask並行度(個數),由客戶端提交job時的切片個數決定。
3)MapTask工做機制
(1)Read階段:Map Task經過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,併產生一系列新的key/value。
(3)Collect階段:在用戶編寫map()函數中,當數據處理完成後,通常會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩衝區中。
(4)Spill階段:即「溢寫」,當環形緩衝區滿後,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。須要注意的是,將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操做。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號partition進行排序,而後按照key進行排序。這樣,通過排序後,數據以分區爲單位彙集在一塊兒,且同一分區內全部數據按照key有序。
步驟2:按照分區編號由小到大依次將每一個分區中的數據寫入任務工做目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。若是用戶設置了Combiner,則寫入文件以前,對每一個分區中的數據進行一次彙集操做。
步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每一個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小。若是當期內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。
(5)Combine階段:當全部數據處理完成後,MapTask對全部臨時文件進行一次合併,以確保最終只會生成一個數據文件。
當全部數據處理完後,MapTask會將全部臨時文件合併成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。
在進行文件合併過程當中,MapTask以分區爲單位進行合併。對於某個分區,它將採用多輪遞歸合併的方式。每輪合併io.sort.factor(默認100)個文件,並將產生的文件從新加入待合併列表中,對文件排序後,重複以上過程,直到最終獲得一個大文件。
讓每一個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。
Mapreduce確保每一個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出做爲輸入傳給reducer)稱爲shuffle。
1)流程示意圖
2)流程詳解
上面的流程是整個mapreduce最全工做流程,可是shuffle過程只是從第7步開始到第16步結束,具體shuffle過程詳解,以下:
1)maptask收集咱們的map()方法輸出的kv對,放到內存緩衝區中
2)從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合併成大的溢出文件
4)在溢出過程當中,及合併的過程當中,都要調用partitoner進行分組和針對key進行排序
5)reducetask根據本身的分區號,去各個maptask機器上取相應的結果分區數據
6)reducetask會取到同一個分區的來自不一樣maptask的結果文件,reducetask會將這些文件再進行合併(歸併排序)
7)合併成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
3)注意
Shuffle中的緩衝區大小會影響到mapreduce程序的執行效率,原則上說,緩衝區越大,磁盤io的次數越少,執行速度就越快。
緩衝區的大小能夠經過參數調整,參數:io.sort.mb 默認100M
0)問題引出:要求將統計結果按照條件輸出到不一樣文件中(分區)。好比:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(分區)
1)默認partition分區
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
默認分區是根據key的hashCode對reduceTasks個數取模獲得的。用戶無法控制哪一個key存儲到哪一個分區
2)自定義Partitioner步驟
(1)自定義類繼承Partitioner,從新getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 獲取電話號碼的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判斷是哪一個省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
(2)在job驅動中,設置自定義partitioner:
job.setPartitionerClass(CustomPartitioner.class)
(3)自定義partition後,要根據自定義partitioner的邏輯設置相應數量的reduce task
job.setNumReduceTasks(5);
3)注意:
若是reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
若是1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;
若是reduceTask的數量=1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;
例如:假設自定義分區數爲5,則
(1)job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
(2)job.setNumReduceTasks(2);會報錯
(3)job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件
4)案例
詳見3.2.2 需求2:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(Partitioner)
詳見3.1.2 需求2:把單詞按照ASCII碼奇偶分區(Partitioner)
排序是MapReduce框架中最重要的操做之一。Map Task和Reduce Task均會對數據(按照key)進行排序。該操做屬於Hadoop的默認行爲。任何應用程序中的數據均會被排序,而無論邏輯上是否須要。
對於Map Task,它會將處理的結果暫時放到一個緩衝區中,當緩衝區使用率達到必定閾值後,再對緩衝區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢後,它會對磁盤上全部文件進行一次合併,以將這些文件合併成一個大的有序文件。
對於Reduce Task,它從每一個Map Task上遠程拷貝相應的數據文件,若是文件大小超過必定閾值,則放到磁盤上,不然放到內存中。若是磁盤上文件數目達到必定閾值,則進行一次合併以生成一個更大文件;若是內存中文件大小或者數目超過必定閾值,則進行一次合併後將數據寫到磁盤上。當全部數據拷貝完畢後,Reduce Task統一對內存和磁盤上的全部數據進行一次合併。
每一個階段的默認排序
1)排序的分類:
(1)部分排序:
MapReduce根據輸入記錄的鍵對數據集排序。保證輸出的每一個文件內部排序。
(2)全排序:
如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失了MapReduce所提供的並行架構。
替代方案:首先建立一系列排好序的文件;其次,串聯這些文件;最後,生成一個全局排序的文件。主要思路是使用一個分區來描述輸出的全局排序。例如:能夠爲上述文件建立3個分區,在第一分區中,記錄的單詞首字母a-g,第二分區記錄單詞首字母h-n, 第三分區記錄單詞首字母o-z。
(3)輔助排序:(GroupingComparator分組)
Mapreduce框架在記錄到達reducer以前按鍵對記錄排序,但鍵所對應的值並無被排序。甚至在不一樣的執行輪次中,這些值的排序也不固定,由於它們來自不一樣的map任務且這些map任務在不一樣輪次中完成時間各不相同。通常來講,大多數MapReduce程序會避免讓reduce函數依賴於值的排序。可是,有時也須要經過特定的方法對鍵進行排序和分組等以實現對值的排序。
2)自定義排序WritableComparable
(1)原理分析
bean對象實現WritableComparable接口重寫compareTo方法,就能夠實現排序
@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
(2)案例
詳見3.2.3 需求3:將統計結果按照總流量倒序排序(排序)
1)對reduce階段的數據根據某一個或幾個字段進行分組。
2)案例
詳見3.3 求出每個訂單中最貴的商品(GroupingComparator)
1)combiner是MR程序中Mapper和Reducer以外的一種組件
2)combiner組件的父類就是Reducer
3)combiner和reducer的區別在於運行的位置:
Combiner是在每個maptask所在的節點運行
Reducer是接收全局全部Mapper的輸出結果;
4)combiner的意義就是對每個maptask的輸出進行局部彙總,以減少網絡傳輸量
5)自定義Combiner實現步驟:
(1)自定義一個combiner繼承Reducer,重寫reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } }
(2)在job中設置:
job.setCombinerClass(WordcountCombiner.class);
6)combiner可以應用的前提是不能影響最終的業務邏輯,並且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來
Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4
Reducer
(3+5+7+2+6)/5=23/5 不等於 (5+4)/2=9/2
7)案例
詳見3.1.3需求3:對每個maptask的輸出局部彙總(Combiner)
1)數據傾斜緣由
若是是多張表的操做都是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜。
2)案例:
詳見3.4.1 需求1:reduce端表合併(數據傾斜)
3)解決方案
在map端緩存多張表,提早處理業務邏輯,這樣增長map端業務,減小reduce端數據的壓力,儘量的減小數據傾斜。
4)具體辦法:採用distributedcache
(1)在mapper的setup階段,將文件讀取到緩存集合中
(2)在驅動函數中加載緩存。
job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運行節點
5)案例:
詳見3.4.2需求2:map端表合併(Distributedcache)
1)設置ReduceTask
reducetask的並行度一樣影響整個job的執行併發度和執行效率,但與maptask的併發數由切片數決定不一樣,Reducetask數量的決定是能夠直接手動設置:
//默認值是1,手動設置爲4 job.setNumReduceTasks(4);
2)注意
(1)若是數據分佈不均勻,就有可能在reduce階段產生數據傾斜
(2)reducetask數量並非任意設置,還要考慮業務邏輯需求,有些狀況下,須要計算全局彙總結果,就只能有1個reducetask。
(3)具體多少個reducetask,須要根據集羣性能而定。
(4)若是分區數不是1,可是reducetask爲1,是否執行分區過程。答案是:不執行分區過程。由於在maptask的源碼中,執行分區的前提是先判斷reduceNum個數是否大於1。不大於1確定不執行。
4)ReduceTask工做機制
(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,若是其大小超過必定閾值,則寫到磁盤上,不然直接放到內存中。
(2)Merge階段:在遠程拷貝數據的同時,ReduceTask啓動了兩個後臺線程對內存和磁盤上的文件進行合併,以防止內存使用過多或磁盤上文件過多。
(3)Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行彙集的一組數據。爲了將key相同的數據聚在一塊兒,Hadoop採用了基於排序的策略。因爲各個MapTask已經實現對本身的處理結果進行了局部排序,所以,ReduceTask只需對全部數據進行一次歸併排序便可。
(4)Reduce階段:reduce()函數將計算結果寫到HDFS上。
1)概述
(1)要在一個mapreduce程序中根據數據的不一樣輸出兩類結果到不一樣目錄,這類靈活的輸出需求能夠經過自定義outputformat來實現。
(1)自定義outputformat,
(2)改寫recordwriter,具體改寫輸出數據的方法write()
2)案例:
詳見3.6 修改日誌內容及自定義日誌輸出路徑(自定義OutputFormat)。
壓縮技術可以有效減小底層存儲系統(HDFS)讀寫字節數。壓縮提升了網絡帶寬和磁盤空間的效率。在Hadood下,尤爲是數據規模很大和工做負載密集的狀況下,使用數據壓縮顯得很是重要。在這種狀況下,I/O操做和網絡數據傳輸要花大量的時間。還有,Shuffle與Merge過程一樣也面臨着巨大的I/O壓力。
鑑於磁盤I/O和網絡帶寬是Hadoop的寶貴資源,數據壓縮對於節省資源、最小化磁盤I/O和網絡傳輸很是有幫助。不過,儘管壓縮與解壓操做的CPU開銷不高,其性能的提高和資源的節省並不是沒有代價。
若是磁盤I/O和網絡帶寬影響了MapReduce做業性能,在任意MapReduce階段啓用壓縮均可以改善端到端處理時間並減小I/O和網絡流量。
壓縮mapreduce的一種優化策略:經過壓縮編碼對mapper或者reducer的輸出進行壓縮,以減小磁盤IO,提升MR程序運行速度(但相應增長了cpu運算負擔)
注意:壓縮特性運用得當能提升性能,但運用不當也可能下降性能
基本原則:
(1)運算密集型的job,少用壓縮
(2)IO密集型的job,多用壓縮
爲了支持多種壓縮/解壓縮算法,Hadoop引入了編碼/解碼器,以下表所示
壓縮性能的比較
壓縮能夠在MapReduce做用的任意階段啓用。
1)輸入壓縮:
在有大量數據並計劃重複處理的狀況下,應該考慮對輸入進行壓縮。然而,你無須顯示指定使用的編解碼方式。Hadoop自動檢查文件擴展名,若是擴展名可以匹配,就會用恰當的編解碼方式對文件進行壓縮和解壓。不然,Hadoop就不會使用任何編解碼器。
2)壓縮mapper輸出:
當map任務輸出的中間數據量很大時,應考慮在此階段採用壓縮技術。這能顯著改善內部數據Shuffle過程,而Shuffle過程在Hadoop處理過程當中是資源消耗最多的環節。若是發現數據量大形成網絡傳輸緩慢,應該考慮使用壓縮技術。可用於壓縮mapper輸出的快速編解碼器包括LZO、LZ4或者Snappy。
注:LZO是供Hadoop壓縮數據用的通用壓縮編解碼器。其設計目標是達到與硬盤讀取速度至關的壓縮速度,所以速度是優先考慮的因素,而不是壓縮率。與gzip編解碼器相比,它的壓縮速度是gzip的5倍,而解壓速度是gzip的2倍。同一個文件用LZO壓縮後比用gzip壓縮後大50%,但比壓縮前小25%~50%。這對改善性能很是有利,map階段完成時間快4倍。
3)壓縮reducer輸出:
在此階段啓用壓縮技術可以減小要存儲的數據量,所以下降所需的磁盤空間。當mapreduce做業造成做業鏈條時,由於第二個做業的輸入也已壓縮,因此啓用壓縮一樣有效。
要在Hadoop中啓用壓縮,能夠配置以下參數(mapred-site.xml文件中):
參數 |
默認值 |
階段 |
建議 |
io.compression.codecs (在core-site.xml中配置) |
org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.Lz4Codec |
輸入壓縮 |
Hadoop使用文件擴展名判斷是否支持某種編解碼器 |
mapreduce.map.output.compress |
false |
mapper輸出 |
這個參數設爲true啓用壓縮 |
mapreduce.map.output.compress.codec |
org.apache.hadoop.io.compress.DefaultCodec |
mapper輸出 |
使用LZO、LZ4或snappy編解碼器在此階段壓縮數據 |
mapreduce.output.fileoutputformat.compress |
false |
reducer輸出 |
這個參數設爲true啓用壓縮 |
mapreduce.output.fileoutputformat.compress.codec |
org.apache.hadoop.io.compress. DefaultCodec |
reducer輸出 |
使用標準工具或者編解碼器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type |
RECORD |
reducer輸出 |
SequenceFile輸出使用的壓縮類型:NONE和BLOCK |
Hadoop爲每一個做業維護若干內置計數器,以描述多項指標。例如,某些計數器記錄已處理的字節數和記錄數,使用戶可監控已處理的輸入數據量和已產生的輸出數據量。
1)API
(1)採用枚舉的方式統計計數
enum MyCounter{MALFORORMED,NORMAL}
//對枚舉定義的自定義計數器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
(2)採用計數器組、計數器名稱的方式統計
context.getCounter("counterGroup", "countera").increment(1);
組名和計數器名稱隨便起,但最好有意義。
(3)計數結果在程序運行後的控制檯上查看。
2)案例
詳見3.6 修改日誌內容及自定義日誌輸出路徑(自定義OutputFormat)。
1)概述
在運行Mapreduce程序以前,每每要先對數據進行清洗,清理掉不符合用戶要求的數據。清理的過程每每只須要運行mapper程序,不須要運行reduce程序。
2)案例
詳見3.7 日誌清洗(數據清洗)。
Yarn是一個資源調度平臺,負責爲運算程序提供服務器運算資源,至關於一個分佈式的操做系統平臺,而mapreduce等運算程序則至關於運行於操做系統之上的應用程序
1)Yarn並不清楚用戶提交的程序的運行機制
2)Yarn只提供運算資源的調度(用戶程序向Yarn申請資源,Yarn就負責分配資源)
3)Yarn中的主管角色叫ResourceManager
4)Yarn中具體提供運算資源的角色叫NodeManager
5)這樣一來,Yarn其實就與運行的用戶程序徹底解耦,就意味着Yarn上能夠運行各類類型的分佈式運算程序(mapreduce只是其中的一種),好比mapreduce、storm程序,spark程序……
6)因此,spark、storm等運算框架均可以整合在Yarn上運行,只要他們各自的框架中有符合Yarn規範的資源請求機制便可
7)Yarn就成爲一個通用的資源調度平臺,今後,企業中之前存在的各類運算集羣均可以整合在一個物理集羣上,提升資源利用率,方便數據共享
1)Yarn運行機制
2)工做機制詳解
(0)Mr程序提交到客戶端所在的節點
(1)yarnrunner向Resourcemanager申請一個application。
(2)rm將該應用程序的資源路徑返回給yarnrunner
(3)該程序將運行所需資源提交到HDFS上
(4)程序資源提交完畢後,申請運行mrAppMaster
(5)RM將用戶的請求初始化成一個task
(6)其中一個NodeManager領取到task任務。
(7)該NodeManager建立容器Container,併產生MRAppmaster
(8)Container從HDFS上拷貝資源到本地
(9)MRAppmaster向RM 申請運行maptask容器
(10)RM將運行maptask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務並建立容器。
(11)MR向兩個接收到任務的NodeManager發送程序啓動腳本,這兩個NodeManager分別啓動maptask,maptask對數據分區排序。
(12)MRAppmaster向RM申請2個容器,運行reduce task。
(13)reduce task向maptask獲取相應分區的數據。
(14)程序運行完畢後,MR會向RM註銷本身。
mapreduce在編程的時候,基本上一個固化的模式,沒有太多可靈活改變的地方,除了如下幾處:
1)輸入數據接口:InputFormat--->FileInputFormat(文件類型數據讀取的通用抽象類) DBInputFormat (數據庫數據讀取的通用抽象類)
默認使用的實現類是:TextInputFormat
job.setInputFormatClass(TextInputFormat.class)
TextInputFormat的功能邏輯是:一次讀一行文本,而後將該行的起始偏移量做爲key,行內容做爲value返回
2)邏輯處理接口: Mapper
徹底須要用戶本身去實現其中:map() setup() clean()
3)map輸出的結果在shuffle階段會被partition以及sort,此處有兩個接口可自定義:
(1)Partitioner
有默認實現 HashPartitioner,邏輯是 根據key和numReduces來返回一個分區號; key.hashCode()&Integer.MAXVALUE % numReduces
一般狀況下,用默認的這個HashPartitioner就能夠,若是業務上有特別的需求,能夠自定義
(2)Comparable
當咱們用自定義的對象做爲key來輸出時,就必需要實現WritableComparable接口,override其中的compareTo()方法
4)reduce端的數據分組比較接口:Groupingcomparator
reduceTask拿到輸入數據(一個partition的全部數據)後,首先須要對數據進行分組,其分組的默認原則是key相同,而後對每一組kv數據調用一次reduce()方法,而且將這一組kv中的第一個kv的key做爲參數傳給reduce的key,將這一組數據的value的迭代器傳給reduce()的values參數
利用上述這個機制,咱們能夠實現一個高效的分組取最大值的邏輯:
自定義一個bean對象用來封裝咱們的數據,而後改寫其compareTo方法產生倒序排序的效果
而後自定義一個Groupingcomparator,將bean對象的分組邏輯改爲按照咱們的業務分組id來分組(好比訂單號)
這樣,咱們要取的最大值就是reduce()方法中傳進來key
5)邏輯處理接口:Reducer
徹底須要用戶本身去實現其中 reduce() setup() clean()
6)輸出數據接口:OutputFormat---> 有一系列子類FileOutputformat DBoutputFormat .....
默認實現類是TextOutputFormat,功能邏輯是:將每個KV對向目標文本文件中輸出爲一行
1)如下參數是在用戶本身的mr應用程序中配置就能夠生效
2)應該在yarn啓動以前就配置在服務器的配置文件中才能生效
配置參數 |
參數說明 |
yarn.scheduler.minimum-allocation-mb 1024 |
給應用程序container分配的最小內存 |
yarn.scheduler.maximum-allocation-mb 8192 |
給應用程序container分配的最大內存 |
yarn.scheduler.minimum-allocation-vcores 1 |
|
yarn.scheduler.maximum-allocation-vcores 32 |
|
yarn.nodemanager.resource.memory-mb 8192 |
|
3)shuffle性能優化的關鍵參數,應在yarn啓動以前就配置好
配置參數 |
參數說明 |
mapreduce.task.io.sort.mb 100 |
shuffle的環形緩衝區大小,默認100m |
mapreduce.map.sort.spill.percent 0.8 |
環形緩衝區溢出的閾值,默認80% |
配置參數 |
參數說明 |
mapreduce.map.maxattempts |
每一個Map Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。 |
mapreduce.reduce.maxattempts |
每一個Reduce Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。 |
mapreduce.map.failures.maxpercent |
當失敗的Map Task失敗比例超過該值爲,整個做業則失敗,默認值爲0. 若是你的應用程序容許丟棄部分輸入數據,則該該值設爲一個大於0的值,好比5,表示若是有低於5%的Map Task失敗(若是一個Map Task重試次數超過mapreduce.map.maxattempts,則認爲這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個做業扔認爲成功。 |
mapreduce.reduce.failures.maxpercent |
當失敗的Reduce Task失敗比例超過該值爲,整個做業則失敗,默認值爲0。 |
mapreduce.task.timeout |
Task超時時間,常常須要設置的一個參數,該參數表達的意思爲:若是一個task在必定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認爲該task處於block狀態,多是卡住了,也許永遠會卡主,爲了防止由於用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。若是你的程序對每條輸入數據的處理時間過長(好比會訪問數據庫,經過網絡拉取數據等),建議將該參數調大,該參數太小常出現的錯誤提示是「AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.」。 |
1)數據準備:
[root@master001 jarHadoop]# hadoop fs -cat /ghh/input/hello.txt i go now for you said you like me once to the time to life rather than to life in time to the time to life rather than to life in time life had a lot of things is futile but we still want to experience good relationships just happen they take time patience and two people who truly want to be together [root@master001 jarHadoop]#
2)分析
按照mapreduce編程規範,分別編寫Mapper,Reducer,Driver。
3)編寫程序
(1)定義一個mapper類
package com.WordCount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN:默認狀況下,是mr框架所讀到的一行文本的起始偏移量,Long; * 在hadoop中有本身的更精簡的序列化接口,因此不直接用Long,而是用LongWritable * VALUEIN:默認狀況下,是mr框架所讀到的一行文本內容,String;此處用Text * * KEYOUT:是用戶自定義邏輯處理完成以後輸出數據中的key,在此處是單詞,String;此處用Text * VALUEOUT,是用戶自定義邏輯處理完成以後輸出數據中的value,在此處是單詞次數,Integer,此處用IntWritable * @author Administrator */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ /** * map階段的業務邏輯就寫在自定義的map()方法中 * maptask會對每一行輸入數據調用一次咱們自定義的map()方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 將maptask傳給咱們的文本內容先轉換成String String line = value.toString(); // 2 根據空格將這一行切分紅單詞 String[] words = line.split(" "); // 3 將單詞輸出爲<單詞,1> for(String word:words){ // 將單詞做爲key,將次數1做爲value,以便於後續的數據分發,能夠根據單詞分發,以便於相同單詞會到相同的reducetask中 context.write(new Text(word), new IntWritable(1)); } } }
(2)定義一個reducer類
package com.WordCount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * KEYIN , VALUEIN 對應mapper輸出的KEYOUT, VALUEOUT類型 * * KEYOUT,VALUEOUT 對應自定義reduce邏輯處理結果的輸出數據類型 KEYOUT是單詞 VALUEOUT是總次數 * @author Administrator */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * key,是一組相同單詞kv對的key */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; // 1 彙總各個key的個數 for(IntWritable value:values){ count +=value.get(); } // 2輸出該key的總次數 context.write(key, new IntWritable(count)); } }
(3)定義一個主類,用來描述job並提交job
package com.WordCount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 至關於一個yarn集羣的客戶端, * 須要在此封裝咱們的mr程序相關運行參數,指定jar包 * 最後提交給yarn * @author Administrator */ public class WordcountDriver { public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者job對象實例 Configuration configuration = new Configuration(); // 8 配置提交到yarn上運行,windows和Linux變量不一致 // configuration.set("mapreduce.framework.name", "yarn"); // configuration.set("yarn.resourcemanager.hostname", "hadoop103"); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路徑 // job.setJar("/home/gh/wc.jar"); job.setJarByClass(WordcountDriver.class); // 2 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); // 3 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 4 指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 5 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 // job.submit(); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
(4)將程序打成jar包,而後拷貝到hadoop集羣中。
(5)啓動hadoop集羣
(6)執行wordcount程序
[root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.WordCount.WordcountDriver /ghh/inPut /ghh/output
0)分析
1)自定義分區
package com.WordCount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class WordCountPartitioner extends Partitioner<Text, IntWritable>{ @Override public int getPartition(Text key, IntWritable value, int numPartitions) { // 1 獲取單詞key String firWord = key.toString().substring(0, 1); char[] charArray = firWord.toCharArray(); int result = charArray[0]; // int result = key.toString().charAt(0); // 2 根據奇數偶數分區 if (result % 2 == 0) { return 0; }else { return 1; } } }
2)在驅動中配置加載分區,設置reducetask個數
job.setPartitionerClass(WordCountPartitioner.class); job.setNumReduceTasks(2);
0)需求:統計過程當中對每個maptask的輸出進行局部彙總,以減少網絡傳輸量,即採用Combiner功能。
方案一
1)增長一個WordcountCombiner類繼承Reducer
package com.WordCount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } }
2)在WordcountDriver驅動類中指定combiner
// 9 指定須要使用combiner,以及用哪一個類做爲combiner的邏輯 job.setCombinerClass(WordcountCombiner.class);
方案二
1)將WordcountReducer做爲combiner在WordcountDriver驅動類中指定
// 9 指定須要使用combiner,以及用哪一個類做爲combiner的邏輯 job.setCombinerClass(WordcountReducer.class);
運行程序
0)需求:將輸入的大量小文件合併成一個切片統一處理。
1)輸入數據:準備5個小文件
2)實現過程
(1)不作任何處理,運行需求1中的wordcount程序,觀察切片個數爲5
(2)在WordcountDriver中增長以下代碼,運行程序,便可看到運行的切片個數爲1
// 9 若是不設置InputFormat,它默認用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
1)需求:
統計每個手機號耗費的總上行流量、下行流量、總流量
2)數據準備
[root@master001 jarHadoop]# hadoop fs -cat /ghh/input1/phone_data.txt
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13560436666 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 [root@master001 jarHadoop]#
輸入數據格式:
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
手機號碼 上行流量 下行流量
輸出數據格式
13560436666 1116 954 2070
手機號碼 上行流量 下行流量 總流量
3)分析
基本思路:
Map階段:
(1)讀取一行數據,切分字段
(2)抽取手機號、上行流量、下行流量
(3)以手機號爲key,bean對象爲value輸出,即context.write(手機號,bean);
Reduce階段:
(1)累加上行流量和下行流量獲得總流量。
(2)實現自定義的bean來封裝流量信息,並將bean做爲map輸出的key來傳輸
(3)MR程序在處理數據的過程當中會對數據排序(map輸出的kv對傳輸到reduce以前,會排序),排序的依據是map輸出的key
因此,咱們若是要實現本身須要的排序規則,則能夠考慮將排序因素放到key中,讓key實現接口:WritableComparable。而後重寫key的compareTo方法。
4)編寫mapreduce程序
(1)編寫流量統計的bean對象
package com.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; // bean對象要實例化 public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; // 反序列化時,須要反射調用空參構造函數,因此必須有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的順序和序列化的順序徹底一致 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
(2)編寫mapreduce主程序
package com.flowsum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCount { static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 將一行內容轉成string String ling = value.toString(); String newstr1=ling.replaceAll(" ",","); //先把全部空格替換成 逗號。 String newstr2=newstr1.replaceAll("\t",","); //再把全部的製表符替換成逗號。 String newstr3=newstr2.replaceAll(",+", ","); //把全部重複的逗號合併成一個逗號。 // 2 切分字段 String[] fields = newstr3.split(","); // 3 取出手機號碼 String phoneNum = fields[1]; // 4 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); // 5 寫出數據 context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow)); } } static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; // 1 遍歷所用bean,將其中的上行流量,下行流量分別累加 for (FlowBean bean : values) { sum_upFlow += bean.getUpFlow(); sum_downFlow += bean.getDownFlow(); } // 2 封裝對象 FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); context.write(key, resultBean); } } public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者job對象實例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowCount.class); // 2 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 3 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 4 指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
(3)將程序打成jar包,而後拷貝到hadoop集羣中。
(4)啓動hadoop集羣
(5)執行flowcount程序
[root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.flowsum.FlowCount /ghh/input1 /ghh/output2
(6)查看結果
0)需求:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(分區)
1)數據準備,同上面需求1的數據
2)分析
(1)Mapreduce中會將map輸出的kv對,按照相同key分組,而後分發給不一樣的reducetask。默認的分發規則爲:根據key的hashcode%reducetask數來分發
(2)若是要按照咱們本身的需求進行分組,則須要改寫數據分發(分組)組件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner
(3)在job驅動中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
3)在需求1的基礎上,增長一個分區類
package com.flowsum; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * K2 V2 對應的是map輸出kv類型 * @author Administrator */ public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 獲取電話號碼的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判斷是哪一個省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
2)在驅動函數中增長自定義數據分區設置和reduce task設置
// 8 指定自定義數據分區 job.setPartitionerClass(ProvincePartitioner.class); // 9 同時指定相應數量的reduce task job.setNumReduceTasks(5);
3)將程序打成jar包,而後拷貝到hadoop集羣中。
4)啓動hadoop集羣
5)執行flowcountPartitionser程序
[root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.flowsum.FlowCount /ghh/input1 /ghh/output3
6)查看結果
0)需求
根據需求1產生的結果再次對總流量進行排序。
1)數據準備,同需求1數據
2)分析
(1)把程序分兩步走,第一步正常統計總流量,第二步再把結果進行排序
(2)context.write(總流量,手機號)
(3)FlowBean實現WritableComparable接口重寫compareTo方法
@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
3)FlowBean對象在在需求1基礎上增長了比較功能
package com.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean1 implements WritableComparable<FlowBean1> { private long upFlow; private long downFlow; private long sumFlow; // 反序列化時,須要反射調用空參構造函數,因此必須有 public FlowBean1() { super(); } public FlowBean1(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的順序和序列化的順序徹底一致 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean1 o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } }
4)Map方法優化爲一個對象,reduce方法則直接輸出結果便可,驅動函數根據輸入輸出重寫配置便可。
package com.flowsum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSort { static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean1, Text>{ FlowBean1 bean = new FlowBean1(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 拿到的是上一個統計程序輸出的結果,已是各手機號的總流量信息 String line = value.toString(); String newstr1=line.replaceAll(" ",","); //先把全部空格替換成 逗號。 String newstr2=newstr1.replaceAll("\t",","); //再把全部的製表符替換成逗號。 String newstr3=newstr2.replaceAll(",+", ","); //把全部重複的逗號合併成一個逗號。 // 2 截取字符串並獲取電話號、上行流量、下行流量 String[] fields = newstr3.split(","); String phoneNbr = fields[0]; // 4 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); // 3 封裝對象 bean.set(upFlow, downFlow); v.set(phoneNbr); // 4 輸出 context.write(bean, v); } } static class FlowCountSortReducer extends Reducer<FlowBean1, Text, Text, FlowBean1>{ @Override protected void reduce(FlowBean1 bean, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), bean); } } public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者job對象實例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowCountSort.class); // 2 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 3 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(FlowBean1.class); job.setMapOutputValueClass(Text.class); // 4 指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean1.class); // 5 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); Path outPath = new Path(args[1]); // FileSystem fs = FileSystem.get(configuration); // if (fs.exists(outPath)) { // fs.delete(outPath, true); // } FileOutputFormat.setOutputPath(job, outPath); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
5)將程序打成jar包,而後拷貝到hadoop集羣中。
6)啓動hadoop集羣
7)執行flowcountsort程序
[root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.flowsum.FlowCountSort /ghh/input1 /ghh/output4
8)查看結果
1)需求
有以下訂單數據
訂單id |
商品id |
成交金額 |
Order_0000001 |
Pdt_01 |
222.8 |
Order_0000001 |
Pdt_05 |
25.8 |
Order_0000002 |
Pdt_03 |
522.8 |
Order_0000002 |
Pdt_04 |
122.4 |
Order_0000002 |
Pdt_05 |
722.4 |
Order_0000003 |
Pdt_01 |
222.8 |
Order_0000003 |
Pdt_02 |
33.8 |
如今須要求出每個訂單中最貴的商品。
2)輸入數據
[root@master001 jarHadoop]# hadoop fs -cat /ghh/input2/GroupingComparator.txt0000001 Pdt_01 222.8 0000002 Pdt_05 722.4 0000001 Pdt_05 25.8 0000003 Pdt_01 222.8 0000003 Pdt_01 33.8 0000002 Pdt_03 522.8 0000002 Pdt_04 122.4 [root@master001 jarHadoop]#
3)分析
(1)利用「訂單id和成交金額」做爲key,能夠將map階段讀取到的全部訂單數據按照id分區,按照金額排序,發送到reduce。
(2)在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,而後取第一個便是最大值。
4)實現
定義訂單信息OrderBean
package com.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean> { private String orderId; private double price; public OrderBean() { super(); } public OrderBean(String orderId, double price) { super(); this.orderId = orderId; this.price = price; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.price = in.readDouble(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeDouble(price); } @Override public int compareTo(OrderBean o) { // 1 先按訂單id排序(從小到大) int result = this.orderId.compareTo(o.getOrderId()); if (result == 0) { // 2 再按金額排序(從大到小) result = price > o.getPrice() ? -1 : 1; } return result; } @Override public String toString() { return orderId + "\t" + price ; } }
編寫OrderSortMapper處理流程
package com.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 獲取一行數據 String line = value.toString(); String newstr1=line.replaceAll(" ",","); //先把全部空格替換成 逗號。 String newstr2=newstr1.replaceAll("\t",","); //再把全部的製表符替換成逗號。 String newstr3=newstr2.replaceAll(",+", ","); //把全部重複的逗號合併成一個逗號。 // 2 截取字段 String[] fields = newstr3.split(","); // 3 封裝bean bean.setOrderId(fields[0]); bean.setPrice(Double.parseDouble(fields[2])); // 4 寫出 context.write(bean, NullWritable.get()); } }
編寫OrderSortReducer處理流程
package com.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ @Override protected void reduce(OrderBean bean, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 直接寫出 context.write(bean, NullWritable.get()); } }
編寫OrderSortDriver處理流程
package com.order; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderSortDriver { public static void main(String[] args) throws Exception { // 1 獲取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 設置jar包加載路徑 job.setJarByClass(OrderSortDriver.class); // 3 加載map/reduce類 job.setMapperClass(OrderSortMapper.class); job.setReducerClass(OrderSortReducer.class); // 4 設置map輸出數據key和value類型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 設置最終輸出數據的key和value類型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 設置輸入數據和輸出數據路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 10 設置reduce端的分組 job.setGroupingComparatorClass(OrderSortGroupingComparator.class); // 7 設置分區 job.setPartitionerClass(OrderSortPartitioner.class); // 8 設置reduce個數 job.setNumReduceTasks(3); // 9 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
編寫OrderSortPartitioner處理流程
package com.order; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class OrderSortPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) { return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
編寫OrderSortGroupingComparator處理流程
package com.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderSortGroupingComparator extends WritableComparator { protected OrderSortGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; // 將orderId相同的bean都視爲一組 return abean.getOrderId().compareTo(bbean.getOrderId()); } }
5)將程序打成jar包,而後拷貝到hadoop集羣中。
6)啓動hadoop集羣
7)執行order程序
[root@master001 jarHadoop]# hadoop jar hadoopTest.jar com.order.OrderSortDriver /ghh/input2 /ghh/output5
8)查看結果