本節所用到的數據下載地址爲:http://pan.baidu.com/s/1bnfELmZjava
咱們知道排序分組是MapReduce中Mapper端的第四步,其中分組排序都是基於Key的,咱們能夠經過下面這幾個例子來體現出來。其中的數據和任務以下圖1.1,1.2所示。算法
#首先按照第一列升序排列,當第一列相同時,第二列升序排列 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #結果 1 1 2 1 2 2 3 1 3 2 3 3
圖 1.1 排序sql
#當第一列相同時,求出第二列的最小值 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #結果 3 1 2 1 1 1
圖 1.2 分組數據庫
使用MapReduce默認排序算法代碼以下1.1所示,在代碼中我將第一列做爲鍵,第二列做爲值。apache
1 package sort; 2 3 import java.io.IOException; 4 import java.net.URI; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FileStatus; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 20 21 public class SortApp { 22 private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 23 private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 24 public static void main(String[] args) throws Exception { 25 Configuration conf=new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outpath = new Path(OUT_PATH); 28 if(fileSystem.exists(outpath)){ 29 fileSystem.delete(outpath,true); 30 } 31 32 final Job job = new Job(conf,SortApp.class.getSimpleName()); 33 34 //1.1 指定輸入文件路徑 35 FileInputFormat.setInputPaths(job, INPUT_PATH); 36 job.setInputFormatClass(TextInputFormat.class);//指定哪一個類用來格式化輸入文件 37 38 //1.2指定自定義的Mapper類 39 job.setMapperClass(MyMapper.class); 40 job.setMapOutputKeyClass(LongWritable.class);//指定輸出<k2,v2>的類型 41 job.setMapOutputValueClass(LongWritable.class); 42 43 //1.3 指定分區類 44 job.setPartitionerClass(HashPartitioner.class); 45 job.setNumReduceTasks(1); 46 47 //1.4 TODO 排序、分區 48 49 //1.5 TODO (可選)合併 50 51 //2.2 指定自定義的reduce類 52 job.setReducerClass(MyReducer.class); 53 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型 54 job.setOutputValueClass(LongWritable.class); 55 56 //2.3 指定輸出到哪裏 57 FileOutputFormat.setOutputPath(job, outpath); 58 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類 59 job.waitForCompletion(true);//把代碼提交給JobTracker執行 60 } 61 static class MyMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable>{ 62 63 @Override 64 protected void map( 65 LongWritable key, 66 Text value, 67 Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) 68 throws IOException, InterruptedException { 69 final String[] splited = value.toString().split("\t"); 70 final long k2 = Long.parseLong(splited[0]); 71 final long v2 = Long.parseLong(splited[1]); 72 context.write(new LongWritable(k2),new LongWritable(v2)); 73 } 74 } 75 static class MyReducer extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable>{ 76 77 @Override 78 protected void reduce( 79 LongWritable k2, 80 Iterable<LongWritable> v2s, 81 Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) 82 throws IOException, InterruptedException { 83 for(LongWritable v2:v2s){ 84 context.write(k2, v2); 85 } 86 } 87 } 88 }
代碼 1.1數組
運行結果以下圖1.3所示緩存
1 1 2 2 2 1 3 3 3 2 3 1
圖 1.3網絡
從上面圖中運行結果能夠看出,MapReduce默認排序算法只對Key進行了排序,並無對value進行排序,沒有達到咱們的要求,因此要實現咱們的要求,還要咱們自定義一個排序算法app
從上面圖中運行結果能夠知道,MapReduce默認排序算法只對Key進行了排序,並無對value進行排序,沒有達到咱們的要求,因此要實現咱們的要求,還要咱們自定義一個排序算法。在map和reduce階段進行排序時,比較的是k2。v2是不參與排序比較的。若是要想讓v2也進行排序,須要把k2和v2組裝成新的類做爲k 2 ,才能參與比較。因此在這裏咱們新建一個新的類型NewK2類型來封裝原來的k2和v2。代碼如1.2所示。分佈式
1 package sort; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.io.WritableComparable; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.Mapper; 16 import org.apache.hadoop.mapreduce.Reducer; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 22 23 public class SortApp { 24 static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 25 static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 26 public static void main(String[] args) throws Exception{ 27 final Configuration configuration = new Configuration(); 28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); 29 if(fileSystem.exists(new Path(OUT_PATH))){ 30 fileSystem.delete(new Path(OUT_PATH), true); 31 } 32 final Job job = new Job(configuration, SortApp.class.getSimpleName()); 33 //1.1 指定輸入文件路徑 34 FileInputFormat.setInputPaths(job, INPUT_PATH); 35 job.setInputFormatClass(TextInputFormat.class);//指定哪一個類用來格式化輸入文件 36 37 //1.2指定自定義的Mapper類 38 job.setMapperClass(MyMapper.class); 39 job.setMapOutputKeyClass(NewK2.class);//指定輸出<k2,v2>的類型 40 job.setMapOutputValueClass(LongWritable.class); 41 42 //1.3 指定分區類 43 job.setPartitionerClass(HashPartitioner.class); 44 job.setNumReduceTasks(1); 45 46 //2.2 指定自定義的reduce類 47 job.setReducerClass(MyReducer.class); 48 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型 49 job.setOutputValueClass(LongWritable.class); 50 51 //2.3 指定輸出到哪裏 52 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 53 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類 54 job.waitForCompletion(true);//把代碼提交給JobTracker執行 55 } 56 57 58 static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ 59 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 60 final String[] splited = value.toString().split("\t"); 61 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1])); 62 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); 63 context.write(k2, v2); 64 }; 65 } 66 67 static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ 68 protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 69 context.write(new LongWritable(k2.first), new LongWritable(k2.second)); 70 }; 71 } 72 73 /** 74 * 問:爲何實現該類? 75 * 答:由於原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,做爲新的k2 76 * 77 */ 78 static class NewK2 implements WritableComparable<NewK2>{ 79 Long first; 80 Long second; 81 82 public NewK2(){} 83 84 public NewK2(long first, long second){ 85 this.first = first; 86 this.second = second; 87 } 88 89 90 @Override 91 public void readFields(DataInput in) throws IOException { 92 this.first = in.readLong(); 93 this.second = in.readLong(); 94 } 95 96 @Override 97 public void write(DataOutput out) throws IOException { 98 out.writeLong(first); 99 out.writeLong(second); 100 } 101 102 /** 103 * 當k2進行排序時,會調用該方法. 104 * 當第一列不一樣時,升序;當第一列相同時,第二列升序 105 */ 106 @Override 107 public int compareTo(NewK2 o) { 108 final long minus = this.first - o.first; 109 if(minus !=0){ 110 return (int)minus; 111 } 112 return (int)(this.second - o.second); 113 } 114 115 @Override 116 public int hashCode() { 117 return this.first.hashCode()+this.second.hashCode(); 118 } 119 120 @Override 121 public boolean equals(Object obj) { 122 if(!(obj instanceof NewK2)){ 123 return false; 124 } 125 NewK2 oK2 = (NewK2)obj; 126 return (this.first==oK2.first)&&(this.second==oK2.second); 127 } 128 } 129 130 }
代碼 1.2
從上面的代碼中咱們能夠發現,咱們的新類型NewK2實現了WritableComparable接口,其中該接口中有一個compareTo()方法,當對關鍵字進行比較會調用該方法,而咱們就在該方法中實現了咱們想要作的事。
運行結果以下圖1.4所示。
1 1 2 1 2 2 3 1 3 2 3 3
圖 1.4
分組是在MapReduce中Mapper端的第四步,分組也是基於Key進行的,將相同key的value放到一個集合中去。還以上面排序代碼爲例,業務邏輯以下圖2.1所示。在代碼中以NewK2爲關鍵字,每一個鍵都不相同,因此會將數據分爲六組,這樣就不能實現咱們的業務要求,但利用自定義類型NewK2,能夠自定義排序算法的同時咱們也能夠自定義分組算法。
#當第一列相同時,求出第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
------------------- #結果 3 1 2 1 1 1
圖 2.1
因爲業務要求分組是按照第一列分組,可是NewK2的比較規則決定了不能按照第一列分,只能自定義分組比較器,代碼以下2.1所示。
1 package group; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.RawComparator; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.io.WritableComparable; 15 import org.apache.hadoop.io.WritableComparator; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.Reducer; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 23 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 24 25 public class GroupApp { 26 static final String INPUT_PATH = "hdfs://hadoop:9000/newinput"; 27 static final String OUT_PATH = "hdfs://hadoop:9000/newoutput"; 28 public static void main(String[] args) throws Exception{ 29 final Configuration configuration = new Configuration(); 30 31 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); 32 if(fileSystem.exists(new Path(OUT_PATH))){ 33 fileSystem.delete(new Path(OUT_PATH), true); 34 } 35 final Job job = new Job(configuration, GroupApp.class.getSimpleName()); 36 37 //1.1 指定輸入文件路徑 38 FileInputFormat.setInputPaths(job, INPUT_PATH); 39 job.setInputFormatClass(TextInputFormat.class);//指定哪一個類用來格式化輸入文件 40 41 //1.2指定自定義的Mapper類 42 job.setMapperClass(MyMapper.class); 43 job.setMapOutputKeyClass(NewK2.class);//指定輸出<k2,v2>的類型 44 job.setMapOutputValueClass(LongWritable.class); 45 46 //1.3 指定分區類 47 job.setPartitionerClass(HashPartitioner.class); 48 job.setNumReduceTasks(1); 49 50 //1.4 TODO 排序、分區 51 job.setGroupingComparatorClass(MyGroupingComparator.class); 52 //1.5 TODO (可選)合併 53 54 //2.2 指定自定義的reduce類 55 job.setReducerClass(MyReducer.class); 56 job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型 57 job.setOutputValueClass(LongWritable.class); 58 59 //2.3 指定輸出到哪裏 60 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 61 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類 62 job.waitForCompletion(true);//把代碼提交給JobTracker執行 63 } 64 65 66 static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ 67 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 68 final String[] splited = value.toString().split("\t"); 69 final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1])); 70 final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); 71 context.write(k2, v2); 72 }; 73 } 74 75 static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ 76 protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { 77 long min = Long.MAX_VALUE; 78 for (LongWritable v2 : v2s) { 79 if(v2.get()<min){ 80 min = v2.get(); 81 } 82 } 83 84 context.write(new LongWritable(k2.first), new LongWritable(min)); 85 }; 86 } 87 88 /** 89 * 問:爲何實現該類? 90 * 答:由於原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,做爲新的k2 91 * 92 */ 93 static class NewK2 implements WritableComparable<NewK2>{ 94 Long first; 95 Long second; 96 97 public NewK2(){} 98 99 public NewK2(long first, long second){ 100 this.first = first; 101 this.second = second; 102 } 103 104 105 @Override 106 public void readFields(DataInput in) throws IOException { 107 this.first = in.readLong(); 108 this.second = in.readLong(); 109 } 110 111 @Override 112 public void write(DataOutput out) throws IOException { 113 out.writeLong(first); 114 out.writeLong(second); 115 } 116 117 /** 118 * 當k2進行排序時,會調用該方法. 119 * 當第一列不一樣時,升序;當第一列相同時,第二列升序 120 */ 121 @Override 122 public int compareTo(NewK2 o) { 123 final long minus = this.first - o.first; 124 if(minus !=0){ 125 return (int)minus; 126 } 127 return (int)(this.second - o.second); 128 } 129 130 @Override 131 public int hashCode() { 132 return this.first.hashCode()+this.second.hashCode(); 133 } 134 135 @Override 136 public boolean equals(Object obj) { 137 if(!(obj instanceof NewK2)){ 138 return false; 139 } 140 NewK2 oK2 = (NewK2)obj; 141 return (this.first==oK2.first)&&(this.second==oK2.second); 142 } 143 } 144 145 static class MyGroupingComparator implements RawComparator<NewK2>{ 146 147 @Override 148 public int compare(NewK2 o1, NewK2 o2) { 149 return (int)(o1.first - o2.first); 150 } 151 152 @Override 153 public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, 154 int arg4, int arg5) { 155 return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8); 156 } 157 158 } 159 }
代碼2.1
從上面的代碼中咱們能夠知道,咱們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,RawComparator又繼承了Comparator接口,這兩個接口的代碼以下:
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); }
在類MyGroupingComparator中分別對着兩個接口中的方法進行了實現,RawComparator中的compare()方法是基於字節的比較,Comparator中的compare()方法是基於對象的比較。在該方法一共有六個參數,以下:
* @param arg0 表示第一個參與比較的字節數組
* @param arg1 表示第一個參與比較的字節數組的起始位置
* @param arg2 表示第一個參與比較的字節數組的偏移量
*
* @param arg3 表示第二個參與比較的字節數組
* @param arg4 表示第二個參與比較的字節數組的起始位置
* @param arg5 表示第二個參與比較的字節數組的偏移量
在於NewK2中存儲着兩個long類型,每一個long類型爲8字節,.compareBytes()方法的參數以下:.compareBytes(arg0, arg1, 8, arg3, arg4, 8);由於比較的是第一列,因此讀取的偏移量爲8字節。因爲咱們要求出每一分組的最小值,因此還重寫Reduce方法,求出每一分租的最小值。最後的運行結果以下圖2.1所示
1 1 2 1 3 1
圖 2.1
Shuffle是MapReduce過程的核心,瞭解Shuffle很是有助於理解MapReduce的工做原理。huffle的正常意思是洗牌或弄亂,可能你們更熟悉的是Java API裏的Collections.shuffle(List)方法,它會隨機地打亂參數list裏的元素順序。若是你不知道MapReduce裏Shuffle是什麼,那麼請看這張圖:
在該圖中分爲Map任務和Reduce任務兩個部分,從map端到reduce端的紅色和綠色的線表示數據流的一個過程,也就是從<K1,V1>到<K2,V2>到<K3,V3>的一個過程。
Map端
<1>在map端首先接觸的是InputSplit,在InputSplit中含有DataNode中的數據,每個InputSplit都會分配一個Mapper任務,Mapper任務結束後產生<K2,V 2>的輸出,這些輸出顯存放在緩存中,每一個map有一個環形內存緩衝區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spil l.percent),一個後臺線程就把內容寫到(spill)Linux本地磁盤中的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。
<2>寫磁盤前,要partition,sort。經過分區,將不一樣類型的數據分開處理,以後對不一樣分區的數據進行排序,若是有Combiner,還要對排序後的數據進行co mbine。等最後記錄寫完,將所有溢出文件合併爲一個分區且排序的文件。
<3>最後將磁盤中的數據送到Reduce中,從圖中能夠看出Map輸出有三個分區,有一個分區數據被送到圖示的Reduce任務中,剩下的兩個分區被送到其餘Reducer任務中。而圖示的Reducer任務的其餘的三個輸入來自其餘的Map輸出。
Reduce端
<1>Reducer經過Http方式獲得輸出文件的分區。
<2>TaskTracker爲分區文件運行Reduce任務。複製階段把Map輸出複製到Reducer的內存或磁盤。一個Map任務完成,Reduce就開始複製輸出。
<3>排序階段合併map輸出。而後走Reduce階段。
Hadoop的壓縮過程並非一個必須的過程,但爲何還要使用呢?在哪些階段可使用,有什麼好處呢?
<1>在Map輸出到Reduce時可使用,由於map端輸出的數據要經過網絡輸出到Reduce端,爲了減小傳輸的數據量咱們能夠採用壓縮的方式來減小延遲。
<2>在整個做業的輸出也可使用
Codec爲是壓縮,解壓縮的算法的實現,在Hadoop中,codec由CompressionCode的實現來表示。下面是一些實現,以下圖3.1所示。
圖 3.1
輸出的壓縮屬性,和使用方式:以下圖3.2,3.3所示。
圖 3.2
圖3.3
<1>單詞計數(已介紹)
<2>數據去重(去掉重複數據不難理解吧)
<3>排序(在上節已介紹)
<4>Top K(是求最值問題,下面會介紹)
下面算法,跟咱們數據庫中的方法比較相似,
<5>選擇---行
數據庫中:該操做的結果應該是一行一行的顯示,至關於where。
MapReduce的實現:以求最值爲例,從100萬數據中選出一行最小值。
<6>投影---列
數據庫中:該操做的結果應該是一列一列的顯示,至關於select。
MapReduce的實現:以求處理手機上網日誌爲例,從其11個字段選出了五個字段來顯示咱們的手機上網流量。
<7>分組
數據庫中:至關於group by。
MapReduce的實現:至關於分區,以求處理手機上網日誌爲例,喊手機號和非手機號分爲兩組。
<9>多表鏈接
MapReduce中:在MapReduce中能夠同時進入多個文件進行操做,其中兩個文件有關係就至關於錶鏈接。那麼如何知道文件之間的關係呢?我能夠經過map函數的context參數來得到文件路徑代碼以下
final FileSplit inputSplit = (FileSplit) context.getInputSplit(); final String path = inputSplit.getPath().toString();
<10>單表關聯
經過上面的分析咱們能夠知道,sql中的方法也能夠在MapReduce中實現,也就是說當把關係型數據庫中的算法所有在MapReduce中實現時,也就意味着sql的使用範圍擴展到了Hadoop,也就是大數據領域,這樣意義是很是大的。
求最值的方法,在咱們的生活中應用很是的廣,好比找出高考中的最高分,也就是狀元,就很是相似分佈式計算,要選出全國的最高分就首先選出各省份的,要選出各省份就得選出各市級的等等,而這些數據量很是大,沒法直接所有加載到內存中,面對如此大的數據量我就能夠考慮使用分佈式計算的方式。咱們以從100萬的數據中求出其中的最大值爲例,介紹該方法。
求最值最簡單的辦法就是對該文件進行一次遍歷得出最值,可是現實中數據比量比較大,這種方法不能實現。在傳統的MapReduce思想中,將文件的數據通過map迭代出來送到reduce中,在Reduce中求出最大值。但這個方法顯然不夠優化,咱們可採用「分而治之」的思想,不須要map的全部數據所有送到reduce中,咱們能夠在map中先求出最大值,將該map任務的最大值送reduce中,這樣就減小了數據的傳輸量。那麼何時該把這個數據寫出去呢?咱們知道,每個鍵值對都會調用一次map(),因爲數據量大調用map()的次數也就多了,顯然在map()函數中將該數據寫出去是不明智的,因此最好的辦法該Mapper任務結束後將該數據寫出去。咱們又知道,當Mapper/Reducer任務結束後會調用cleanup函數,因此咱們能夠在該函數中將該數據寫出去。瞭解了這些咱們能夠看一下程序的代碼如3.1所示。
1 package suanfa; 2 3 import java.net.URI; 4 5 import mapreduce.WordCountApp; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.LongWritable; 11 import org.apache.hadoop.io.NullWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 20 public class TopKApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input2"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/out2"; 23 24 public static void main(String[] args) throws Exception { 25 Configuration conf = new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outPath = new Path(OUT_PATH); 28 if(fileSystem.exists(outPath)){ 29 fileSystem.delete(outPath, true); 30 } 31 32 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 33 FileInputFormat.setInputPaths(job, INPUT_PATH); 34 job.setMapperClass(MyMapper.class); 35 job.setReducerClass(MyReducer.class); 36 job.setOutputKeyClass(LongWritable.class); 37 job.setOutputValueClass(NullWritable.class); 38 FileOutputFormat.setOutputPath(job, outPath); 39 job.waitForCompletion(true); 40 } 41 static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{ 42 long max = Long.MIN_VALUE; 43 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 44 final long temp = Long.parseLong(v1.toString()); 45 if(temp>max){ 46 max = temp; 47 } 48 }; 49 50 protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException { 51 context.write(new LongWritable(max), NullWritable.get()); 52 }; 53 } 54 55 static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{ 56 long max = Long.MIN_VALUE; 57 protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException { 58 final long temp = k2.get(); 59 if(temp>max){ 60 max = temp; 61 } 62 }; 63 64 protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException { 65 context.write(new LongWritable(max), NullWritable.get()); 66 }; 67 } 68 }
代碼3.1
運行結果爲:32767,也就是咱們數據中的最大值