Hadoop日記Day18---MapReduce排序分組

本節所用到的數據下載地址爲:http://pan.baidu.com/s/1bnfELmZjava

MapReduce的排序分組任務與要求

  咱們知道排序分組是MapReduce中Mapper端的第四步,其中分組排序都是基於Key的,咱們能夠經過下面這幾個例子來體現出來。其中的數據和任務以下圖1.1,1.2所示。算法

#首先按照第一列升序排列,當第一列相同時,第二列升序排列
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#結果
1    1
2    1
2    2
3    1
3    2
3    3

圖 1.1 排序sql

#當第一列相同時,求出第二列的最小值
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#結果
3    1
2    1
1    1

圖 1.2 分組數據庫

1、 排序算法

1.1 MapReduce默認排序算法

  使用MapReduce默認排序算法代碼以下1.1所示,在代碼中我將第一列做爲鍵,第二列做爲值。apache

 1 package sort;
 2 
 3 import java.io.IOException;
 4 import java.net.URI;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.FileStatus;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
20 
21 public class SortApp {
22     private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
23     private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
24     public static void main(String[] args) throws Exception {
25         Configuration conf=new Configuration();
26         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
27         final Path outpath = new Path(OUT_PATH);
28         if(fileSystem.exists(outpath)){
29             fileSystem.delete(outpath,true);
30         }
31         
32         final Job job = new Job(conf,SortApp.class.getSimpleName());
33         
34         //1.1 指定輸入文件路徑
35         FileInputFormat.setInputPaths(job, INPUT_PATH);        
36         job.setInputFormatClass(TextInputFormat.class);//指定哪一個類用來格式化輸入文件
37                 
38         //1.2指定自定義的Mapper類
39         job.setMapperClass(MyMapper.class);        
40         job.setMapOutputKeyClass(LongWritable.class);//指定輸出<k2,v2>的類型
41         job.setMapOutputValueClass(LongWritable.class);
42                 
43         //1.3 指定分區類
44         job.setPartitionerClass(HashPartitioner.class);
45         job.setNumReduceTasks(1);
46                 
47         //1.4 TODO 排序、分區
48                 
49         //1.5  TODO (可選)合併
50                 
51         //2.2 指定自定義的reduce類
52         job.setReducerClass(MyReducer.class);        
53         job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型
54         job.setOutputValueClass(LongWritable.class);
55                 
56         //2.3 指定輸出到哪裏
57         FileOutputFormat.setOutputPath(job, outpath);        
58         job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類                        
59         job.waitForCompletion(true);//把代碼提交給JobTracker執行        
60     }
61     static class MyMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable>{
62 
63         @Override
64         protected void map(
65                 LongWritable key,
66                 Text value,
67                 Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
68                 throws IOException, InterruptedException {
69             final String[] splited = value.toString().split("\t");
70             final long k2 = Long.parseLong(splited[0]);
71             final long v2 = Long.parseLong(splited[1]);
72             context.write(new LongWritable(k2),new LongWritable(v2));
73         }    
74     }
75     static class MyReducer extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable>{
76 
77         @Override
78         protected void reduce(
79                 LongWritable k2,
80                 Iterable<LongWritable> v2s,
81                 Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context)
82                 throws IOException, InterruptedException {
83             for(LongWritable v2:v2s){
84                 context.write(k2, v2);
85             }            
86         }    
87     }
88 }
View Code

代碼 1.1數組

  運行結果以下圖1.3所示緩存

1    1
2    2
2    1
3    3
3    2
3    1

圖 1.3網絡

  從上面圖中運行結果能夠看出,MapReduce默認排序算法只對Key進行了排序,並無對value進行排序,沒有達到咱們的要求,因此要實現咱們的要求,還要咱們自定義一個排序算法app

1.2 自定義排序算法

  從上面圖中運行結果能夠知道,MapReduce默認排序算法只對Key進行了排序,並無對value進行排序,沒有達到咱們的要求,因此要實現咱們的要求,還要咱們自定義一個排序算法。在map和reduce階段進行排序時,比較的是k2。v2是不參與排序比較的。若是要想讓v2也進行排序,須要把k2和v2組裝成新的類做爲k 2 ,才能參與比較。因此在這裏咱們新建一個新的類型NewK2類型來封裝原來的k2和v2。代碼如1.2所示。分佈式

  1 package sort;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7 
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.io.WritableComparable;
 14 import org.apache.hadoop.mapreduce.Job;
 15 import org.apache.hadoop.mapreduce.Mapper;
 16 import org.apache.hadoop.mapreduce.Reducer;
 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 22 
 23 public class SortApp {
 24     static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
 25     static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
 26     public static void main(String[] args) throws Exception{
 27         final Configuration configuration = new Configuration();
 28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
 29         if(fileSystem.exists(new Path(OUT_PATH))){
 30             fileSystem.delete(new Path(OUT_PATH), true);
 31         }
 32         final Job job = new Job(configuration, SortApp.class.getSimpleName());
 33         //1.1 指定輸入文件路徑
 34         FileInputFormat.setInputPaths(job, INPUT_PATH);        
 35         job.setInputFormatClass(TextInputFormat.class);//指定哪一個類用來格式化輸入文件
 36         
 37         //1.2指定自定義的Mapper類
 38         job.setMapperClass(MyMapper.class);        
 39         job.setMapOutputKeyClass(NewK2.class);//指定輸出<k2,v2>的類型
 40         job.setMapOutputValueClass(LongWritable.class);
 41         
 42         //1.3 指定分區類
 43         job.setPartitionerClass(HashPartitioner.class);
 44         job.setNumReduceTasks(1);
 45         
 46         //2.2 指定自定義的reduce類
 47         job.setReducerClass(MyReducer.class);        
 48         job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型
 49         job.setOutputValueClass(LongWritable.class);
 50         
 51         //2.3 指定輸出到哪裏
 52         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));        
 53         job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類
 54         job.waitForCompletion(true);//把代碼提交給JobTracker執行
 55     }
 56 
 57     
 58     static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
 59         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
 60             final String[] splited = value.toString().split("\t");
 61             final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
 62             final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
 63             context.write(k2, v2);
 64         };
 65     }
 66     
 67     static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
 68         protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
 69             context.write(new LongWritable(k2.first), new LongWritable(k2.second));
 70         };
 71     }
 72     
 73     /**
 74      * 問:爲何實現該類?
 75      * 答:由於原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,做爲新的k2
 76      *
 77      */
 78     static class  NewK2 implements WritableComparable<NewK2>{
 79         Long first;
 80         Long second;
 81         
 82         public NewK2(){}
 83         
 84         public NewK2(long first, long second){
 85             this.first = first;
 86             this.second = second;
 87         }
 88         
 89         
 90         @Override
 91         public void readFields(DataInput in) throws IOException {
 92             this.first = in.readLong();
 93             this.second = in.readLong();
 94         }
 95 
 96         @Override
 97         public void write(DataOutput out) throws IOException {
 98             out.writeLong(first);
 99             out.writeLong(second);
100         }
101 
102         /**
103          * 當k2進行排序時,會調用該方法.
104          * 當第一列不一樣時,升序;當第一列相同時,第二列升序
105          */
106         @Override
107         public int compareTo(NewK2 o) {
108             final long minus = this.first - o.first;
109             if(minus !=0){
110                 return (int)minus;
111             }
112             return (int)(this.second - o.second);
113         }
114         
115         @Override
116         public int hashCode() {
117             return this.first.hashCode()+this.second.hashCode();
118         }
119         
120         @Override
121         public boolean equals(Object obj) {
122             if(!(obj instanceof NewK2)){
123                 return false;
124             }
125             NewK2 oK2 = (NewK2)obj;
126             return (this.first==oK2.first)&&(this.second==oK2.second);
127         }
128     }
129     
130 }
View Code

代碼 1.2

  從上面的代碼中咱們能夠發現,咱們的新類型NewK2實現了WritableComparable接口,其中該接口中有一個compareTo()方法,當對關鍵字進行比較會調用該方法,而咱們就在該方法中實現了咱們想要作的事。

  運行結果以下圖1.4所示。

1    1
2    1
2    2
3    1
3    2
3    3

圖 1.4

2、分組算法

2.1 MapReduce默認分組

  分組是在MapReduce中Mapper端的第四步,分組也是基於Key進行的,將相同key的value放到一個集合中去。還以上面排序代碼爲例,業務邏輯以下圖2.1所示。在代碼中以NewK2爲關鍵字,每一個鍵都不相同,因此會將數據分爲六組,這樣就不能實現咱們的業務要求,但利用自定義類型NewK2,能夠自定義排序算法的同時咱們也能夠自定義分組算法。

#當第一列相同時,求出第二列的最小值
3    3
3    2
3    1
2    2
2    1
1    1
------------------- #結果 3 1 2 1 1 1

圖 2.1

2.2 自定義分組比較器

  因爲業務要求分組是按照第一列分組,可是NewK2的比較規則決定了不能按照第一列分,只能自定義分組比較器,代碼以下2.1所示。

  1 package group;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7 
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.RawComparator;
 13 import org.apache.hadoop.io.Text;
 14 import org.apache.hadoop.io.WritableComparable;
 15 import org.apache.hadoop.io.WritableComparator;
 16 import org.apache.hadoop.mapreduce.Job;
 17 import org.apache.hadoop.mapreduce.Mapper;
 18 import org.apache.hadoop.mapreduce.Reducer;
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 20 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 22 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 23 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 24 
 25 public class GroupApp {
 26     static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
 27     static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
 28     public static void main(String[] args) throws Exception{
 29         final Configuration configuration = new Configuration();
 30         
 31         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
 32         if(fileSystem.exists(new Path(OUT_PATH))){
 33             fileSystem.delete(new Path(OUT_PATH), true);
 34         }        
 35         final Job job = new Job(configuration, GroupApp.class.getSimpleName());
 36         
 37         //1.1 指定輸入文件路徑
 38         FileInputFormat.setInputPaths(job, INPUT_PATH);        
 39         job.setInputFormatClass(TextInputFormat.class);//指定哪一個類用來格式化輸入文件
 40         
 41         //1.2指定自定義的Mapper類
 42         job.setMapperClass(MyMapper.class);        
 43         job.setMapOutputKeyClass(NewK2.class);//指定輸出<k2,v2>的類型
 44         job.setMapOutputValueClass(LongWritable.class);
 45         
 46         //1.3 指定分區類
 47         job.setPartitionerClass(HashPartitioner.class);
 48         job.setNumReduceTasks(1);
 49         
 50         //1.4 TODO 排序、分區
 51         job.setGroupingComparatorClass(MyGroupingComparator.class);
 52         //1.5  TODO (可選)合併
 53         
 54         //2.2 指定自定義的reduce類
 55         job.setReducerClass(MyReducer.class);        
 56         job.setOutputKeyClass(LongWritable.class);//指定輸出<k3,v3>的類型
 57         job.setOutputValueClass(LongWritable.class);
 58         
 59         //2.3 指定輸出到哪裏
 60         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));        
 61         job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類        
 62         job.waitForCompletion(true);//把代碼提交給JobTracker執行
 63     }
 64 
 65     
 66     static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
 67         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
 68             final String[] splited = value.toString().split("\t");
 69             final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
 70             final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
 71             context.write(k2, v2);
 72         };
 73     }
 74     
 75     static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
 76         protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
 77             long min = Long.MAX_VALUE;
 78             for (LongWritable v2 : v2s) {
 79                 if(v2.get()<min){
 80                     min = v2.get();
 81                 }
 82             }
 83             
 84             context.write(new LongWritable(k2.first), new LongWritable(min));
 85         };
 86     }
 87     
 88     /**
 89      * 問:爲何實現該類?
 90      * 答:由於原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,做爲新的k2
 91      *
 92      */
 93     static class  NewK2 implements WritableComparable<NewK2>{
 94         Long first;
 95         Long second;
 96         
 97         public NewK2(){}
 98         
 99         public NewK2(long first, long second){
100             this.first = first;
101             this.second = second;
102         }
103         
104         
105         @Override
106         public void readFields(DataInput in) throws IOException {
107             this.first = in.readLong();
108             this.second = in.readLong();
109         }
110 
111         @Override
112         public void write(DataOutput out) throws IOException {
113             out.writeLong(first);
114             out.writeLong(second);
115         }
116 
117         /**
118          * 當k2進行排序時,會調用該方法.
119          * 當第一列不一樣時,升序;當第一列相同時,第二列升序
120          */
121         @Override
122         public int compareTo(NewK2 o) {
123             final long minus = this.first - o.first;
124             if(minus !=0){
125                 return (int)minus;
126             }
127             return (int)(this.second - o.second);
128         }
129         
130         @Override
131         public int hashCode() {
132             return this.first.hashCode()+this.second.hashCode();
133         }
134         
135         @Override
136         public boolean equals(Object obj) {
137             if(!(obj instanceof NewK2)){
138                 return false;
139             }
140             NewK2 oK2 = (NewK2)obj;
141             return (this.first==oK2.first)&&(this.second==oK2.second);
142         }
143     }
144     
145     static class MyGroupingComparator implements RawComparator<NewK2>{
146 
147         @Override
148         public int compare(NewK2 o1, NewK2 o2) {
149             return (int)(o1.first - o2.first);
150         }
151     
152         @Override
153         public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
154                 int arg4, int arg5) {
155             return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
156         }
157         
158     }
159 }
View Code

代碼2.1

  從上面的代碼中咱們能夠知道,咱們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,RawComparator又繼承了Comparator接口,這兩個接口的代碼以下:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

}
public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}

  在類MyGroupingComparator中分別對着兩個接口中的方法進行了實現,RawComparator中的compare()方法是基於字節的比較,Comparator中的compare()方法是基於對象的比較。在該方法一共有六個參數,以下:
         * @param arg0 表示第一個參與比較的字節數組
         * @param arg1 表示第一個參與比較的字節數組的起始位置
         * @param arg2 表示第一個參與比較的字節數組的偏移量
         *
         * @param arg3 表示第二個參與比較的字節數組
         * @param arg4 表示第二個參與比較的字節數組的起始位置
         * @param arg5 表示第二個參與比較的字節數組的偏移量

  在於NewK2中存儲着兩個long類型,每一個long類型爲8字節,.compareBytes()方法的參數以下:.compareBytes(arg0, arg1, 8, arg3, arg4, 8);由於比較的是第一列,因此讀取的偏移量爲8字節。因爲咱們要求出每一分組的最小值,因此還重寫Reduce方法,求出每一分租的最小值。最後的運行結果以下圖2.1所示

1    1
2    1
3    1

圖 2.1

3、MapReduce的一些算法

3.1 MapReduce中Shuffle過程

  Shuffle是MapReduce過程的核心,瞭解Shuffle很是有助於理解MapReduce的工做原理。huffle的正常意思是洗牌或弄亂,可能你們更熟悉的是Java API裏的Collections.shuffle(List)方法,它會隨機地打亂參數list裏的元素順序。若是你不知道MapReduce裏Shuffle是什麼,那麼請看這張圖:

  在該圖中分爲Map任務和Reduce任務兩個部分,從map端到reduce端的紅色和綠色的線表示數據流的一個過程,也就是從<K1,V1>到<K2,V2>到<K3,V3>的一個過程。

Map端

  <1>在map端首先接觸的是InputSplit,在InputSplit中含有DataNode中的數據,每個InputSplit都會分配一個Mapper任務,Mapper任務結束後產生<K2,V 2>的輸出,這些輸出顯存放在緩存中,每一個map有一個環形內存緩衝區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spil l.percent),一個後臺線程就把內容寫到(spill)Linux本地磁盤中的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。

  <2>寫磁盤前,要partition,sort。經過分區,將不一樣類型的數據分開處理,以後對不一樣分區的數據進行排序,若是有Combiner,還要對排序後的數據進行co mbine。等最後記錄寫完,將所有溢出文件合併爲一個分區且排序的文件。

  <3>最後將磁盤中的數據送到Reduce中,從圖中能夠看出Map輸出有三個分區,有一個分區數據被送到圖示的Reduce任務中,剩下的兩個分區被送到其餘Reducer任務中。而圖示的Reducer任務的其餘的三個輸入來自其餘的Map輸出。

Reduce端

  <1>Reducer經過Http方式獲得輸出文件的分區。
  <2>TaskTracker爲分區文件運行Reduce任務。複製階段把Map輸出複製到Reducer的內存或磁盤。一個Map任務完成,Reduce就開始複製輸出。
  <3>排序階段合併map輸出。而後走Reduce階段。

3.2 Hadoop壓縮算法

3.2.1 算法介紹

  Hadoop的壓縮過程並非一個必須的過程,但爲何還要使用呢?在哪些階段可使用,有什麼好處呢?
      <1>在Map輸出到Reduce時可使用,由於map端輸出的數據要經過網絡輸出到Reduce端,爲了減小傳輸的數據量咱們能夠採用壓縮的方式來減小延遲。
      <2>在整個做業的輸出也可使用
  Codec爲是壓縮,解壓縮的算法的實現,在Hadoop中,codec由CompressionCode的實現來表示。下面是一些實現,以下圖3.1所示。

 

圖 3.1

3.2.2  MapReduce的輸出進行壓縮

  輸出的壓縮屬性,和使用方式:以下圖3.2,3.3所示。

 

圖 3.2

 

圖3.3

3.3 常見算法

3.3.1 MapReduce常見算介紹

<1>單詞計數(已介紹)
<2>數據去重(去掉重複數據不難理解吧)
<3>排序(在上節已介紹)
<4>Top K(是求最值問題,下面會介紹)

下面算法,跟咱們數據庫中的方法比較相似,
<5>選擇---行

    數據庫中:該操做的結果應該是一行一行的顯示,至關於where。

    MapReduce的實現:以求最值爲例,從100萬數據中選出一行最小值。
<6>投影---列

    數據庫中:該操做的結果應該是一列一列的顯示,至關於select。    

    MapReduce的實現:以求處理手機上網日誌爲例,從其11個字段選出了五個字段來顯示咱們的手機上網流量。
<7>分組

    數據庫中:至關於group by。        

    MapReduce的實現:至關於分區,以求處理手機上網日誌爲例,喊手機號和非手機號分爲兩組。
<9>多表鏈接

    MapReduce中:在MapReduce中能夠同時進入多個文件進行操做,其中兩個文件有關係就至關於錶鏈接。那麼如何知道文件之間的關係呢?我能夠經過map函數的context參數來得到文件路徑代碼以下

  final FileSplit inputSplit = (FileSplit) context.getInputSplit();
  final String path = inputSplit.getPath().toString();

<10>單表關聯  

  經過上面的分析咱們能夠知道,sql中的方法也能夠在MapReduce中實現,也就是說當把關係型數據庫中的算法所有在MapReduce中實現時,也就意味着sql的使用範圍擴展到了Hadoop,也就是大數據領域,這樣意義是很是大的。

3.3.2 Top K 最值案例

  求最值的方法,在咱們的生活中應用很是的廣,好比找出高考中的最高分,也就是狀元,就很是相似分佈式計算,要選出全國的最高分就首先選出各省份的,要選出各省份就得選出各市級的等等,而這些數據量很是大,沒法直接所有加載到內存中,面對如此大的數據量我就能夠考慮使用分佈式計算的方式。咱們以從100萬的數據中求出其中的最大值爲例,介紹該方法。

  求最值最簡單的辦法就是對該文件進行一次遍歷得出最值,可是現實中數據比量比較大,這種方法不能實現。在傳統的MapReduce思想中,將文件的數據通過map迭代出來送到reduce中,在Reduce中求出最大值。但這個方法顯然不夠優化,咱們可採用「分而治之」的思想,不須要map的全部數據所有送到reduce中,咱們能夠在map中先求出最大值,將該map任務的最大值送reduce中,這樣就減小了數據的傳輸量。那麼何時該把這個數據寫出去呢?咱們知道,每個鍵值對都會調用一次map(),因爲數據量大調用map()的次數也就多了,顯然在map()函數中將該數據寫出去是不明智的,因此最好的辦法該Mapper任務結束後將該數據寫出去。咱們又知道,當Mapper/Reducer任務結束後會調用cleanup函數,因此咱們能夠在該函數中將該數據寫出去。瞭解了這些咱們能夠看一下程序的代碼如3.1所示。

 1 package suanfa;
 2 
 3 import java.net.URI;
 4 
 5 import mapreduce.WordCountApp;
 6 
 7 import org.apache.hadoop.conf.Configuration;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.NullWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.Mapper;
15 import org.apache.hadoop.mapreduce.Reducer;
16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
19 
20 public class TopKApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/input2";
22     static final String OUT_PATH = "hdfs://hadoop:9000/out2";
23     
24     public static void main(String[] args) throws Exception {
25         Configuration conf = new Configuration();
26         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
27         final Path outPath = new Path(OUT_PATH);
28         if(fileSystem.exists(outPath)){
29             fileSystem.delete(outPath, true);
30         }
31         
32         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
33         FileInputFormat.setInputPaths(job, INPUT_PATH);
34         job.setMapperClass(MyMapper.class);
35         job.setReducerClass(MyReducer.class);
36         job.setOutputKeyClass(LongWritable.class);
37         job.setOutputValueClass(NullWritable.class);
38         FileOutputFormat.setOutputPath(job, outPath);
39         job.waitForCompletion(true);
40     }
41     static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
42         long max = Long.MIN_VALUE;
43         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
44             final long temp = Long.parseLong(v1.toString());
45             if(temp>max){
46                 max = temp;
47             }
48         };
49         
50         protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
51             context.write(new LongWritable(max), NullWritable.get());
52         };
53     }
54 
55     static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
56         long max = Long.MIN_VALUE;
57         protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException {
58             final long temp = k2.get();
59             if(temp>max){
60                 max = temp;
61             }
62         };
63         
64         protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
65             context.write(new LongWritable(max), NullWritable.get());
66         };
67     }        
68 }
View Code

代碼3.1

運行結果爲:32767,也就是咱們數據中的最大值

相關文章
相關標籤/搜索