MapReduce的二次排序

附錄以前總結的一個例子:html

http://www.cnblogs.com/DreamDrive/p/7398455.htmljava

另外兩個有價值的博文:apache

http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html數組

http://blog.csdn.net/heyutao007/article/details/5890103ide

一.MR的二次排序的需求說明

在mapreduce操做時,shuffle階段會屢次根據key值排序。可是在shuffle分組後,相同key值的values序列的順序是不肯定的(以下圖)。若是想要此時value值也是排序好的,這種需求就是二次排序。函數

二.測試的文件數據

a 1
a 5
a 7
a 9
b 3
b 8
b 10

 

 

 

 

 

三.未通過二次排序的輸出結果

a    9
a    7
a    5
a    1
b    10
b    8
b    3

 

 

 

 

 

四.第一種實現思路

直接在reduce端對分組後的values進行排序。oop

reduce關鍵代碼測試

 1 @Override
 2      public void reduce(Text key, Iterable<IntWritable> values, Context context)
 3              throws IOException, InterruptedException {
 4 
 5           List<Integer> valuesList = new ArrayList<Integer>();
 6 
 7           // 取出value
 8           for(IntWritable value : values) {
 9               valuesList.add(value.get());
10           }
11           // 進行排序
12           Collections.sort(valuesList);
13 
14           for(Integer value : valuesList) {
15              context.write(key, new IntWritable(value));
16           }
17 
18      }

 輸出結果:this

a    1
a    5
a    7
a    9
b    3
b    8
b    10

很容易發現,這樣把排序工做都放到reduce端完成,當values序列長度很是大的時候,會對CPU和內存形成極大的負載。spa

注意的地方(容易被「坑」)

在reduce端對values進行迭代的時候,不要直接存儲value值或者key值,由於reduce方法會反覆執行屢次,但key和value相關的對象只有兩個,reduce會反覆重用這兩個對象。須要用相應的數據類型.get()取出後再存儲。

五.第二種實現思路

將map端輸出的<key,value>中的key和value組合成一個新的key(稱爲newKey),value值不變。這裏就變成<(key,value),value>,在針對newKey排序的時候,若是key相同,就再對value進行排序。

須要自定義的地方
  1.自定義數據類型實現組合key
    實現方式:繼承WritableComparable
  2.自定義partioner,造成newKey後保持分區規則任然按照key進行。保證不打亂原來的分區。
    實現方式:繼承partitioner
  3.自定義分組,保持分組規則任然按照key進行。不打亂原來的分組
    實現方式:繼承RawComparator
自定義數據類型關鍵代碼

 1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4 import org.apache.hadoop.io.WritableComparable;
 5 
 6 public class PairWritable implements WritableComparable<PairWritable> {
 7  // 組合key
 8    private String first;
 9    private int second;
10 
11  public PairWritable() {
12  }
13 
14  public PairWritable(String first, int second) {
15      this.set(first, second);
16  }
17 
18  /**
19   * 方便設置字段
20   */
21  public void set(String first, int second) {
22      this.first = first;
23      this.second = second;
24  }
25 
26  /**
27   * 反序列化
28   */
29  @Override
30  public void readFields(DataInput arg0) throws IOException {
31      this.first = arg0.readUTF();
32      this.second = arg0.readInt();
33  }
34  /**
35   * 序列化
36   */
37  @Override
38  public void write(DataOutput arg0) throws IOException {
39      arg0.writeUTF(first);
40      arg0.writeInt(second);
41  }
42 
43  /*
44   * 重寫比較器
45   */
46  public int compareTo(PairWritable o) {
47      int comp = this.first.compareTo(o.first);
48 
49      if(comp != 0) {
50          return comp;
51      } else { // 若第一個字段相等,則比較第二個字段
52          return Integer.valueOf(this.second).compareTo(
53                  Integer.valueOf(o.getSecond()));
54      }
55  }
56 
57  public int getSecond() {
58      return second;
59  }
60  public void setSecond(int second) {
61      this.second = second;
62  }
63  public String getFirst() {
64      return first;
65  }
66  public void setFirst(String first) {
67      this.first = first;
68  }

自定義分區規則

 1 import org.apache.hadoop.io.IntWritable;
 2 import org.apache.hadoop.mapreduce.Partitioner;
 3 
 4 public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {
 5 
 6     @Override
 7     public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
 8         /* 
 9          * 默認的實現 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
10          * 讓key中first字段做爲分區依據
11          */
12         return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; 
13     }
14 }

 

自定義分組比較器

 1 import org.apache.hadoop.io.RawComparator;
 2 import org.apache.hadoop.io.WritableComparator;
 3 
 4 public class SecondGroupComparator implements RawComparator<PairWritable> {
 5 
 6     /*
 7      * 對象比較
 8      */
 9     public int compare(PairWritable o1, PairWritable o2) {
10         return o1.getFirst().compareTo(o2.getFirst());
11     }
12 
13     /*
14      * 字節比較
15      * arg0,arg3爲要比較的兩個字節數組
16      * arg1,arg2表示第一個字節數組要進行比較的收尾位置,arg4,arg5表示第二個
17      * 從第一個字節比到組合key中second的前一個字節,由於second爲int型,因此長度爲4
18      */
19     public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
20         return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
21     }
22

 

map關鍵代碼

 1 private PairWritable mapOutKey = new PairWritable();
 2       private IntWritable mapOutValue = new IntWritable();
 3       public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 4           String lineValue = value.toString();
 5           String[] strs = lineValue.split("\t");
 6 
 7           //設置組合key和value ==> <(key,value),value>
 8           mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
 9           mapOutValue.set(Integer.valueOf(strs[1]));
10 
11           context.write(mapOutKey, mapOutValue);
12       }

reduce關鍵代碼

 1 private Text outPutKey = new Text(); 
 2       public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
 3               throws IOException, InterruptedException {
 4           //迭代輸出
 5           for(IntWritable value : values) {
 6               outPutKey.set(key.getFirst());
 7               context.write(outPutKey, value);
 8           }
 9 
10       }

輸出結果:

a    1
a    5
a    7
a    9
b    3
b    8
b    10

原理:

在map階段:

使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時InputFormat提供一個RecordReder的實現。

本例子中使用的是TextInputFormat,他提供的RecordReder會將文本的一行的行號做爲key,這一行的文本做爲value。這就是自定義Map的輸入是<LongWritable, Text>的緣由。

而後調用自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。

在map階段的最後,會先調用job.setPartitionerClass對這個List進行分區,每一個分區映射到一個reducer。每一個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。

能夠看到,這自己就是一個二次排序。若是沒有經過job.setSortComparatorClass設置key比較函數類,則使用key的實現的compareTo方法。


在reduce階段:

reducer接收到全部映射到這個reducer的map輸出後,也是會調用job.setSortComparatorClass設置的key比較函數類對全部數據對排序。

而後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設置的分組函數類。

只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的全部key的第一個key。

最後就是進入Reducer的reduce方法,reduce方法的輸入是全部的(key和它的value迭代器)。一樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。  

相關文章
相關標籤/搜索