序列化就是把內存中的對象,轉換成字節序列 (或其餘數據傳輸協議) 以便於存儲 (持久化) 和網絡傳輸。 前端
反序列化就是將收到字節序列 (或其餘數據傳輸協議) 或者是硬盤的持久化數據,轉換成內存中的對象。java
Java 的序列化是一個重量級序列化框架 (Serializable) ,一個對象被序列化後,會附帶不少額外的信息 (各類校驗信息,header,繼承體系等) ,不便於在網絡中高效傳輸。因此,Hadoop 本身開發了一套序列化機制 (Writable),精簡、高效。算法
經常使用的數據類型對應的 Hadoop 數據序列化類型apache
Java類型 | Hadoop Writable類型 |
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
string | Text |
map | MapWritable |
array | ArrayWritable |
自定義 bean 對象要想序列化傳輸,必須實現序列化接口,須要注意如下7項:
(1) 必須實現 Writable 接口;
(2) 反序列化時,須要反射調用空參構造函數,因此必須有空參構造;
(3) 重寫序列化方法;
(4) 重寫反序列化方法;
(5) 注意反序列化的順序和序列化的順序徹底一致;
(6) 要想把結果顯示在文件中,須要重寫 toString(),且用 "\t" 分開,方便後續用;
(7) 若是須要將自定義的 bean 放在 key 中傳輸,則還須要實現 comparable 接口,由於 mapreduce 過程當中的 shuffle 過程必定會對 key 進行排序。json
// 1 必須實現Writable接口 public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; //2 反序列化時,須要反射調用空參構造函數,因此必須有 public FlowBean() { super(); } /** * 3重寫序列化方法 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 4 重寫反序列化方法 * 5 注意反序列化的順序和序列化的順序徹底一致 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } // 6要想把結果顯示在文件中,須要重寫toString(),且用」\t」分開,方便後續用 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } //7 若是須要將自定義的bean放在key中傳輸,則還須要實現comparable接口,由於mapreduce框中的shuffle過程必定會對key進行排序 @Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } }
waitForCompletion() submit(); // 1 創建鏈接 connect(); // 1)建立提交job的代理 new Cluster(getConfiguration()); // (1)判斷是本地yarn仍是遠程 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1) 建立給集羣提交數據的Stag路徑 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)獲取jobid ,並建立job路徑 JobID jobId = submitClient.getNewJobID(); // 3)拷貝jar包到集羣 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)計算切片,生成切片規劃文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路徑寫xml配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交job,返回提交狀態 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
(1) 找到你數據存儲的目錄;
(2) 開始遍歷處理 (規劃切片) 目錄下的每個文件;
(3) 遍歷第一個文件 ss.txt;
A、獲取文件大小 fs.sizeOf(ss.txt);
B、計算切片大小 computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128M;
C、默認狀況下,切片大小 =blocksize;
D、開始切,造成第 1 個切片:ss.txt—0:128M 第 2 個切片 ss.txt—128:256M 第 3 個切片 ss.txt—256M:300M (每次切片時,都要判斷切完剩下的部分是否大於塊的 1.1 倍,不大於 1.1 倍就劃分一塊切片);
E、將切片信息寫到一個切片規劃文件中;
F、整個切片的核心過程在 getSplit() 方法中完成;
G、數據切片只是在邏輯上對輸入數據進行分片,並不會再磁盤上將其切分紅分片進行存儲。InputSplit 只記錄了分片的元數據信息,好比起始位置、長度以及所在的節點列表等;
H、注意:block 是 HDFS 上物理上存儲的存儲的數據,切片是對數據邏輯上的劃分;
(4) 提交切片規劃文件到 yarn 上,yarn 上的 MrAppMaster 就能夠根據切片規劃文件計算開啓 maptask 個數。緩存
(1) 簡單地按照文件的內容長度進行切片;
(2) 切片大小,默認等於 block 大小;
(3) 切片時不考慮數據集總體,而是逐個針對每個文件單獨切片。
好比待處理數據有兩個文件:bash
file1.txt 320M file2.txt 10M
通過 FileInputFormat 的切片機制運算後,造成的切片信息以下: 網絡
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M
獲取切片信息 API:數據結構
// 根據文件類型獲取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 獲取切片的文件名稱 String name = inputSplit.getPath().getName();
經過分析源碼,在 FileInputFormat 中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1 默認值爲 1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值 Long.MAXValue
所以,默認狀況下,切片大小 =blocksize。
maxsize (切片最大值):參數若是調得比 blocksize 小,則會讓切片變小,並且就等於配置的這個參數的值。
minsize (切片最小值):參數調的比 blockSize 大,則可讓切片變得比 blocksize 還大。架構
關於大量小文件的優化策略
1. 默認狀況下 TextInputformat 對任務的切片機制是按文件規劃切片,無論文件多小,都會是一個單獨的切片,都會交給一個 maptask,這樣若是有大量小文件,就會產生大量的 maptask,處理效率極其低下。
2. 優化策略
(1) 最好的辦法,在數據處理系統的最前端 (預處理/採集),將小文件先合併成大文件,再上傳到 HDFS 作後續分析;
(2) 補救措施:若是已是大量小文件在 HDFS 中了,可使用另外一種 InputFormat 來作切片 (CombineTextInputFormat),它的切片邏輯跟 TextFileInputFormat 不一樣:它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個 maptask;
(3) 優先知足最小切片大小,不超過最大切片大小。
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m 舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
3. 具體實現步驟
//若是不設置 InputFormat,它默認用的是 TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
1. 採用自定義 InputFormat 的方式,處理輸入小文件的問題。
(1) 自定義一個 InputFormat;
(2) 改寫 RecordReader,實現一次讀取一個完整文件封裝爲 KV;
(3) 在輸出時使用 SequenceFileOutPutFormat 輸出合併文件。
2. 程序實現
(1) 自定義 InputFormat
package com.atguigu.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 1 定義一個本身的recordReader WholeRecordReader recordReader = new WholeRecordReader(); // 2 初始化recordReader recordReader.initialize(split, context); return recordReader; } }
(2) 自定義 RecordReader
package com.atguigu.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit split; private Configuration configuration; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 獲取傳遞過來的數據 this.split = (FileSplit) split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { // 1 定義緩存 byte[] contents = new byte[(int) split.getLength()]; // 2 獲取文件系統 Path path = split.getPath(); FileSystem fs = path.getFileSystem(configuration); // 3 讀取內容 FSDataInputStream fis = null; try { // 3.1 打開輸入流 fis = fs.open(path); // 3.2 讀取文件內容 IOUtils.readFully(fis, contents, 0, contents.length); // 3.3 輸出文件內容 value.set(contents, 0, contents.length); } catch (Exception e) { } finally { IOUtils.closeStream(fis); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return processed?1:0; } @Override public void close() throws IOException { } }
(3) InputFormatDriver 處理流程
package com.atguigu.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; public class InputFormatDriver { static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { private Text filenameKey; @Override protected void setup(Context context) throws IOException, InterruptedException { // 獲取切片信息 InputSplit split = context.getInputSplit(); // 獲取切片路徑 Path path = ((FileSplit) split).getPath(); // 根據切片路徑獲取文件名稱 filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { // 文件名稱爲key context.write(filenameKey, value); } } public static void main(String[] args) throws Exception { args = new String[] { "e:/input", "e:/output11" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(InputFormatDriver.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
maptask 的並行度決定 map 階段的任務處理併發度,進而影響到整個 job 的處理速度。那麼,mapTask 並行任務是否越多越好呢?
一個 job 的 map 階段 MapTask 並行度(個數),由客戶端提交 job 時的切片個數決定。
(1) Read 階段:Map Task 經過用戶編寫的 RecordReader,從輸入 InputSplit 中解析出一個個 key/value。
(2) Map 階段:該節點主要是將解析出的 key/value 交給用戶編寫 map() 函數處理,併產生一系列新的 key/value。
(3) Collect 階段:在用戶編寫 map() 函數中,當數據處理完成後,通常會調用 OutputCollector.collect() 輸出結果。在該函數內部,它會將生成的 key/value 分區 (調用 Partitioner),並寫入一個環形內存緩衝區中。
(4) Spill 階段:即「溢寫」,當環形緩衝區滿後,MapReduce 會將數據寫到本地磁盤上,生成一個臨時文件。須要注意的是,將數據寫入本地磁盤以前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操做。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號 partition 進行排序,而後按照 key 進行排序。這樣,通過排序後,數據以分區爲單位彙集在一塊兒,且同一分區內全部數據按照 key 有序;
步驟2:按照分區編號由小到大依次將每一個分區中的數據寫入任務工做目錄下的臨時文件 output/spillN.out (N表示當前溢寫次數) 中。若是用戶設置了 Combiner,則寫入文件以前,對每一個分區中的數據進行一次彙集操做;
步驟3:將分區數據的元信息寫到內存索引數據結構 SpillRecord 中,其中每一個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小。若是當期內存索引大小超過 1MB,則將內存索引寫到文件 output/spillN.out.index 中。
(4) Combine 階段:當全部數據處理完成後,MapTask 對全部臨時文件進行一次合併,以確保最終只會生成一個數據文件。
當全部數據處理完後,MapTask 會將全部臨時文件合併成一個大文件,並保存到文件 output/file.out 中,同時生成相應的索引文件 output/file.out.index。
在進行文件合併過程當中,MapTask 以分區爲單位進行合併。對於某個分區,它將採用多輪遞歸合併的方式。每輪合併 io.sort.factor (默認100) 個文件,並將產生的文件從新加入待合併列表中,對文件排序後,重複以上過程,直到最終獲得一個大文件。
讓每一個 MapTask 最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。
Mapreduce 確保每一個 reducer 的輸入都是按鍵排序的。系統執行排序的過程 (即將 map 輸出做爲輸入傳給 reducer) 稱爲 shuffle。
上面的流程是整個 mapreduce 最全工做流程,可是 shuffle 過程只是從第 7 步開始到第 16 步結束,具體 shuffle 過程詳解,以下:
(1) maptask 收集咱們的 map() 方法輸出的 kv 對,放到內存緩衝區中;
(2) 從內存緩衝區不斷溢出本地磁盤文件,可能會溢出多個文件;
(3) 多個溢出文件會被合併成大的溢出文件;
(4) 在溢出過程當中,及合併的過程當中,都要調用 partitoner 進行分組和針對 key 進行排序;
(5) reducetask 根據本身的分區號,去各個 maptask 機器上取相應的結果分區數據;
(6) reducetask 會取到同一個分區的來自不一樣 maptask 的結果文件,reducetask 會將這些文件再進行合併(歸併排序);
(7) 合併成大文件後,shuffle 的過程也就結束了,後面進入 reducetask 的邏輯運算過程(從文件中取出一個一個的鍵值對 group,調用用戶自定義的 reduce() 方法)。
Shuffle 中的緩衝區大小會影響到 mapreduce 程序的執行效率,原則上說,緩衝區越大,磁盤 io 的次數越少,執行速度就越快。
緩衝區的大小能夠經過參數調整,參數:io.sort.mb 默認100M。
要求將統計結果按照條件輸出到不一樣文件中(分區)。好比:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件中(分區)
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
默認分區是根據 key 的 hashCode 對 reduceTasks 個數取模獲得的。用戶無法控制哪一個 key 存儲到哪一個分區。
(1) 自定義類繼承 Partitioner,重寫 getPartition() 方法。
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 獲取電話號碼的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判斷是哪一個省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
(2) 在 job 驅動中,設置自定義 partitioner。
job.setPartitionerClass(CustomPartitioner.class)
(3) 自定義 partition 後,要根據自定義 partitioner 的邏輯設置相應數量的 reduce task。
job.setNumReduceTasks(5);
若是 reduceTask 的數量大於 getPartition 的結果數,則會多產生幾個空的輸出文件 part-r-000xx;
若是 reduceTask 的數量大於 1 且小於 getPartition 的結果數,則有一部分分區數據無處安放,會報 Exception;
若是 reduceTask 的數量等於1,則無論 mapTask 端輸出多少個分區文件,最終結果都交給這一個 reduceTask,最終也就只會產生一個結果文件 part-r-00000;
例如:假設自定義分區數爲5,則
(1) job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
(2) job.setNumReduceTasks(2);會報錯
(3) job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件
排序是 MapReduce 框架中最重要的操做之一。Map Task 和 Reduce Task 均會對數據 (按照 key )進行排序。該操做屬於 Hadoop 的默認行爲。任何應用程序中的數據均會被排序,而無論邏輯上是否須要。
對於 Map Task,它會將處理的結果暫時放到一個緩衝區中,當緩衝區使用率達到必定閾值後,再對緩衝區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢後,它會對磁盤上全部文件進行一次合併,以將這些文件合併成一個大的有序文件。
對於 Reduce Task,它從每一個 Map Task 上遠程拷貝相應的數據文件,若是文件大小超過必定閾值,則放到磁盤上,不然放到內存中。若是磁盤上文件數目達到必定閾值,則進行一次合併以生成一個更大文件;若是內存中文件大小或者數目超過必定閾值,則進行一次合併後將數據寫到磁盤上。當全部數據拷貝完畢後,Reduce Task 統一對內存和磁盤上的全部數據進行一次合併。
(1) 部分排序
MapReduce 根據輸入記錄的鍵對數據集排序。保證輸出的每一個文件內部排序。
(2) 全排序
如何用 Hadoop 產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失了 MapReduce 所提供的並行架構。
替代方案:首先建立一系列排好序的文件;其次,串聯這些文件;最後,生成一個全局排序的文件。主要思路是使用一個分區來描述輸出的全局排序。例如:能夠爲上述文件建立 3 個分區,在第一分區中,記錄的單詞首字母 a-g,第二分區記錄單詞首字母 h-n, 第三分區記錄單詞首字母 o-z。
(3) 輔助排序( GroupingComparator 分組)
Mapreduce 框架在記錄到達 reducer 以前按鍵對記錄排序,但鍵所對應的值並無被排序。甚至在不一樣的執行輪次中,這些值的排序也不固定,由於它們來自不一樣的 map 任務且這些 map 任務在不一樣輪次中完成時間各不相同。通常來講,大多數 MapReduce 程序會避免讓 reduce 函數依賴於值的排序。可是,有時也須要經過特定的方法對鍵進行排序和分組等以實現對值的排序。
(1) 原理分析
bean 對象實現 WritableComparable 接口重寫 compareTo() 方法,就能夠實現排序。
(2) 案例
統計每一個手機號的流量,並按總量排序。
A、改造 FlowBean 對象,添加比較功能。
package com.mr.sort import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; // 反序列化時,須要反射調用空參構造函數,因此必須有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的順序和序列化的順序徹底一致 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } }
B、Map 方法優化爲一個對象,reduce 方法則直接輸出結果便可,驅動函數根據輸入輸出重寫配置便可。
package com.mr.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSort { static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ FlowBean bean = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 拿到的是上一個統計程序輸出的結果,已是各手機號的總流量信息 String line = value.toString(); // 2 截取字符串並獲取電話號、上行流量、下行流量 String[] fields = line.split("\t"); String phoneNbr = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); // 3 封裝對象 bean.set(upFlow, downFlow); v.set(phoneNbr); // 4 輸出 context.write(bean, v); } } static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ @Override protected void reduce(FlowBean bean, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), bean); } } public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者job對象實例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowCountSort.class); // 2 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 3 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 4 指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); Path outPath = new Path(args[1]); // FileSystem fs = FileSystem.get(configuration); // if (fs.exists(outPath)) { // fs.delete(outPath, true); // } FileOutputFormat.setOutputPath(job, outPath); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
C、將程序打成 jar 包,而後拷貝到 Hadoop 集羣中執行 FlowCountSort 程序,並查看執行結果。
# 執行程序 hadoop jar FlowCountSort.jar com.mr.sort.FlowCountSort /input/flowcount/ /output/flowcount/ # 查看程序結果 hadoop fs -cat /output/flowcount/part-r-00000
功能:對 reduce 階段的數據根據某一個或幾個字段進行分組。
案例:求每一個訂單中最貴的商品
有以下訂單數據,如今須要求出每個訂單中最貴的商品。
訂單原始數據 訂單id 商品id 成交金額 Order_0000001 Pdt_01 222.8 Order_0000001 Pdt_05 25.8 Order_0000002 Pdt_03 522.8 Order_0000002 Pdt_04 122.4 Order_0000002 Pdt_05 722.4 Order_0000003 Pdt_01 222.8 Order_0000003 Pdt_02 33.8 預期結果數據 part-r-00000 文件 Order_0000001 222.8 part-r-00001 文件 Order_0000002 722.4 part-r-00002 文件 Order_0000003 222.8
將上述訂單原始數據的數據部分寫入 GroupingComparatorTest.txt 文件中,以後做爲程序的輸入文件,預期獲得三個輸出數據結果文件 part-r-00000、part-r-0000一、part-r-00002。
(1) 利用「訂單 id 和成交金額」做爲 key,能夠將 map 階段讀取到的全部訂單數據按照 id 分區,按照金額排序,發送到 reduce。
(2) 在 reduce 端利用 groupingcomparator 將訂單 id 相同的 kv 聚合成組,而後取第一個便是最大值。
(1) 定義訂單信息 OrderBean
package com.test.mapreduce.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean> { private String orderId; private double price; public OrderBean() { super(); } public OrderBean(String orderId, double price) { super(); this.orderId = orderId; this.price = price; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.price = in.readDouble(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeDouble(price); } @Override public int compareTo(OrderBean o) { // 1 先按訂單id排序(從小到大) int result = this.orderId.compareTo(o.getOrderId()); if (result == 0) { // 2 再按金額排序(從大到小) result = price > o.getPrice() ? -1 : 1; } return result; } @Override public String toString() { return orderId + "\t" + price ; } }
(2) OrderSortMapper 處理流程
package com.test.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 獲取一行數據 String line = value.toString(); // 2 截取字段 String[] fields = line.split("\t"); // 3 封裝bean bean.setOrderId(fields[0]); bean.setPrice(Double.parseDouble(fields[2])); // 4 寫出 context.write(bean, NullWritable.get()); } }
(3) 編寫 OrderSortReducer 處理流程
package com.test.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ @Override protected void reduce(OrderBean bean, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 直接寫出 context.write(bean, NullWritable.get()); } }
(4) 編寫 OrderSortDriver 處理流程
package com.test.mapreduce.order; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderSortDriver { public static void main(String[] args) throws Exception { // 1 獲取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 設置jar包加載路徑 job.setJarByClass(OrderSortDriver.class); // 3 加載map/reduce類 job.setMapperClass(OrderSortMapper.class); job.setReducerClass(OrderSortReducer.class); // 4 設置map輸出數據key和value類型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 設置最終輸出數據的key和value類型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 設置輸入數據和輸出數據路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 10 設置reduce端的分組 job.setGroupingComparatorClass(OrderSortGroupingComparator.class); // 7 設置分區 job.setPartitionerClass(OrderSortPartitioner.class); // 8 設置reduce個數 job.setNumReduceTasks(3); // 9 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
(5) 編寫 OrderSortPartitioner 處理流程
package com.test.mapreduce.order; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class OrderSortPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) { return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
(6) 編寫 OrderSortGroupingComparator 處理流程
package com.test.mapreduce.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderSortGroupingComparator extends WritableComparator { protected OrderSortGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; // 將orderId相同的bean都視爲一組 return abean.getOrderId().compareTo(bbean.getOrderId()); } }
(1) combiner 是 MR 程序中 Mapper 和 Reducer 以外的一種組件
(2) combiner 組件的父類就是 Reducer
(3) combiner 和 reducer 的區別在於運行的位置:Combiner 是在每個 maptask 所在的節點運行,Reducer 是接收全局全部 Mapper 的輸出結果;
(4) combiner 的意義就是對每個 maptask 的輸出進行局部彙總,以減少網絡傳輸量。
(5) combiner 可以應用的前提是不能影響最終的業務邏輯,並且,combiner 的輸出 kv 應該跟 reducer 的輸入 kv 類型要對應起來。
Mapper 3 5 7 ->(3+5+7)/3=5 2 6 ->(2+6)/2=4 Reducer (3+5+7+2+6)/5=23/5 不等於 (5+4)/2=9/2
需求:統計單詞計數的過程當中對每個 maptask 的輸出進行局部彙總,以減少網絡傳輸量即採用 Combiner 功能。
方案一:增長一個 WordcountCombiner 類繼承 Reducer,而後在 WordcountDriver 驅動類中指定 combiner。
package com.test.mr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } } // 驅動類中指定須要使用combiner,以及用哪一個類做爲combiner的邏輯 job.setCombinerClass(WordcountCombiner.class);
方案二:將 WordcountReducer 做爲 combiner 在 WordcountDriver 驅動類中指定,即直接在驅動類中指定 WordCountReducer 做爲 combine。
// 指定須要使用combiner,以及用哪一個類做爲combiner的邏輯 job.setCombinerClass(WordcountReducer.class);