排序是MapReduce框架中最重要的操做之一。java
在Mapper和Reducer階段都有涉及,Map Task和Reduce Task均會對數據(按照key)進行排序。該操做屬於Hadoop的默認行爲。任何應用程序中的數據均會被排序,而無論邏輯上是否須要。web
對於Map Task,它會將處理的結果暫時放到一個緩衝區中,當緩衝區使用率達到必定閾值後,再對緩衝區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢後,它會對磁盤上全部文件進行一次合併,以將這些文件合併成一個大的有序文件。apache
對於Reduce Task,它從每一個Map Task上遠程拷貝相應的數據文件,若是文件大小超過必定閾值,則放到磁盤上,不然放到內存中。架構
若是磁盤上文件數目達到必定閾值,則進行一次合併以生成一個更大文件;若是內存中文件大小或者數目超過必定閾值,則進行一次合併後將數據寫到磁盤上。當全部數據拷貝完畢後,Reduce Task統一對內存和磁盤上的全部數據進行一次合併。app
一、部分排序框架
MapReduce根據輸入記錄的鍵對數據集排序,保證輸出的每一個文件內部排序。webapp
二、全排序ide
如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失了MapReduce所提供的並行架構。函數
全排序的解決方案:oop
主要思路是使用一個分區來描述輸出的全局排序。例如:能夠爲上述文件建立3個分區,在第一分區中,記錄的單詞首字母a-g,第二分區記錄單詞首字母h-n, 第三分區記錄單詞首字母o-z。
三、輔助排序:(GroupingComparator分組)
Mapreduce框架在記錄到達reducer以前按鍵對記錄排序,但鍵所對應的值並無被排序。甚至在不一樣的執行輪次中,這些值的排序也不固定,由於它們來自不一樣的map任務且這些map任務在不一樣輪次中完成時間各不相同。
通常來講,大多數MapReduce程序會避免讓reduce函數依賴於值的排序。可是,有時也須要經過特定的方法對鍵進行排序和分組等以實現對值的排序。
不少時候,默認的排序邏輯並不能知足咱們的需求,例如流量Bean的排序。咱們須要自定義排序邏輯。
原理很簡單:對象實現WritableComparable接口重寫compareTo方法,就能夠實現排序。
public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
咱們如今有了這樣的需求,須要將手機流量的輸出結果集不按默認的手機號碼序輸出,而是按照彙總流量從高到低輸出。
一、分析
首先要明確的是,排序只能針對key進行,而咱們的彙總流量屬於Val,所以,咱們須要將FlowBean做爲key進行中間排序。須要從新定義一個MapReducer任務。
二、編寫代碼
(1)建立一個新的模塊,phone-flow-sort。
(2)編寫流量Bean,和以前的Bean差很少,只不過實現了排序接口
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable, WritableComparable<FlowBean> { ... // desc sort. public int compareTo(FlowBean other) { return this.getTotalFlow()>other.getTotalFlow()? -1:1; } ... }
(3)Mapper
package com.zhaoyi.flowbeansort; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> { FlowBean bean = new FlowBean(); Text phone = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] strings = line.split("\t"); // Key-FlowBean: get the up and downFlow. bean.set(Long.parseLong(strings[1]), Long.parseLong(strings[2])); // Val-Phone phone.set(value); context.write(bean, phone); } }
須要明確的是,咱們當前的輸入文件即爲phoneflow項目的輸出結果,其結構以下
13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954 2070 13560439658 2034 5892 7926 13602846565 1938 2910 4848 13660577991 6960 690 7650 13719199419 240 0 240 13726230503 2481 24681 27162 13726238888 2481 24681 27162 13760778710 120 120 240 13826544101 264 0 264 13922314466 3008 3720 6728 13925057413 11058 48243 59301 13926251106 240 0 240 13926435656 132 1512 1644 15013685858 3659 3538 7197 15920133257 3156 2936 6092 15989002119 1938 180 2118 18211575961 1527 2106 3633 18320173382 9531 2412 11943 84138413 4116 1432 5548
所以,相對於當前的Mapper,輸入類型爲行號,Val爲Text,即一行數據。如今,咱們只需將FlowBean做爲Mapper的key輸出,電話號碼做爲val。
而後在reducer中將他們顛倒,便可完成咱們的需求。
(4)Reducer
package com.zhaoyi.flowbeansort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> { @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // the values must be only one, and we just need upside down key-val. context.write(values.iterator().next(), key); } }
(5)Driver
package com.zhaoyi.flowbeansort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowSortDriver { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowSortDriver.class); job.setMapperClass(FlowSortMapper.class); job.setReducerClass(FlowSortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 1:0); } }
因爲Mapper與Reducer的輸出類型不同,所以須要指定Map的輸出k-v類型,不能像以前的那樣省略這裏的代碼了。
配置參數program arg,以以前的項目的輸出做爲輸入。運行項目,將輸出結果生成到output2中。
D:\hadoop\output D:\hadoop\output2
查看結果分區文件
13502468823 7335 110349 117684 7335 110349 117684 13925057413 11058 48243 59301 11058 48243 59301 13726238888 2481 24681 27162 2481 24681 27162 13726230503 2481 24681 27162 2481 24681 27162 18320173382 9531 2412 11943 9531 2412 11943 13560439658 2034 5892 7926 2034 5892 7926 13660577991 6960 690 7650 6960 690 7650 15013685858 3659 3538 7197 3659 3538 7197 13922314466 3008 3720 6728 3008 3720 6728 15920133257 3156 2936 6092 3156 2936 6092 84138413 4116 1432 5548 4116 1432 5548 13602846565 1938 2910 4848 1938 2910 4848 18211575961 1527 2106 3633 1527 2106 3633 15989002119 1938 180 2118 1938 180 2118 13560436666 1116 954 2070 1116 954 2070 13926435656 132 1512 1644 132 1512 1644 13480253104 180 180 360 180 180 360 13826544101 264 0 264 264 0 264 13926251106 240 0 240 240 0 240 13760778710 120 120 240 120 120 240 13719199419 240 0 240 240 0 240
實現了咱們最終的需求。
能夠看到,整個過程咱們其實只是定義了FlowBean的排序邏輯,同時將其做爲key而已(具體來講,是Mapper的輸出key)。