[大牛翻譯系列]Hadoop(5)MapReduce 排序:次排序(Secondary sort)

4.2 排序(SORT)

在MapReduce中,排序的目的有兩個:算法

  1. MapReduce能夠經過排序將Map輸出的鍵分組。而後每組鍵調用一次reduce。
  2. 在某些須要排序的特定場景中,用戶能夠將做業(job)的所有輸出進行整體排序。

例如:須要瞭解前N個最受歡迎的用戶或網頁的數據分析工做。app

在這一節中,有兩個場景須要對MapReduce的排序行爲進行優化。ide

  1. 次排序(Secondary sort)
  2. 總排序(Total order sorting)

次排序能夠根據reduce的鍵對它的值進行排序。若是要求一些數據先於另一些數據到達reduce,次排序就頗有用。(這一章在講解優化過的重分區鏈接中也提到了這樣的場景。)另外一個場景中,須要將做業的輸出根據兩個鍵進行排序,一個鍵的優先級高於另一個鍵(secondary key)。這個場景也能夠用到次排序。例如:將股票數據先根據股票標誌進行主排序(primary sort),而後根據股票配額進行次排序。本書不少技術中將會運用次排序,如重分區鏈接的優化,朋友圖算法等。函數

這一節第二部分中,將探討對reduce的輸出的所有數據進行整體排序。這在分析數據集中的前N個元素或後N個元素時會比較有用。oop

 

4.2.1 次排序(Secondary sort)

在前一節(MapReduce鏈接)中,次排序用於使一部分數據先於另一部分到達reduce。做爲基礎知識,學習次排序前須要瞭解MapReduce中的數據整理和數據流。圖4.12說明了三個影響數據整理和數據流(分區,排序,分組)的元素,而且說明了這些元素如何整合到MapReduce中。學習

 

 

在map輸出收集(output collection)階段,由分區器(Partitioner)選擇哪一個reduce應該接收map的輸出。map輸出的各個分區的數據,由RawComparator進行排序。Reduce端也用RawComparator進行排序。而後,由RawComparator對排序好的數據進行分組。測試

 

技術21 實現次排序優化

對於某個map的鍵的全部值,若是須要其中一部分值先於另一部分值到達reduce,就能夠用到次排序。次排序還用在了本書的第7章中的朋友圖算法,和通過優化的重分區排序中。this

 

問題spa

在發送給某個reduce的數據中,須要對某個天然鍵(natural key)的值進行排序。

 

方案

這個技術中將應用到自定義分區類,排序比較類(sort comparator),分組比較類(grouping comparator)。這些是實現次排序的基礎。

 

討論

在這個技術中,使用次排序來對人的名字進行排序。具體步驟是:先用主排序對人的姓排序,再用次排序對人的名字排序。

次排序須要在map函數中生成組合鍵(composite key)做爲輸出鍵。

組合輸出鍵包括兩個部分:

  1. 天然鍵,用於鏈接。
  2. 次鍵(secondary key),用於對隸屬於天然鍵的值進行排序。排序後的結果將被髮送給reduce。

圖4.13說明了組合鍵的構成。它還包括了一個用於reduce端的組合值(composite value)。組合值讓reduce能夠訪問次鍵。

 

 

在介紹了組合鍵類以後,接下來具體說明分區,排序和分組階段以及他們的實現。

 

組合鍵(COMPOSITE KEY)

組合鍵包括姓氏和名字。它擴展了WritableComparable。WritableComparable被推薦用於map函數輸出鍵的Writable類。

 

 1 public class Person implements WritableComparable<Person> {
 2 
 3     private String firstName;
 4     private String lastName;
 5     
 6     @Override
 7     public void readFields(DataInput in) throws IOException {
 8         this.firstName = in.readUTF();
 9         this.lastName = in.readUTF();
10     }
11     
12     @Override
13     public void write(DataOutput out) throws IOException {
14         out.writeUTF(firstName);
15         out.writeUTF(lastName);
16     }
17 ...

 

圖4.14說明了分區,排序和分組的類的名字和方法的設置。同時還有各個類如何使用組合鍵。

 

 

接下來是對其它類的實現代碼的介紹。

 

分區器(PARTITIONER)

分區器用來決定map的輸出值應該分配到哪一個reduce。MapReduce的默認分區器(HashPartitioner)調用輸出鍵的hashCode方法,而後用hashCode方法的結果對reduce的數量進行一個模數(modulo)運算,最後獲得那個目標reduce。默認的分區器使用整個鍵。這就不適於組合鍵了。由於它可能把有一樣天然鍵的組合鍵發送給不一樣的reduce。所以,就須要自定義分區器,基於天然鍵進行分區。

如下代碼實現了分區器的接口。getPartition方法的輸入參數有key,value和分區的數量:

 

1 public interface Partitioner<K2, V2> extends JobConfigurable {
2     int getPartition(K2 key, V2 value, int numPartitions);
3 }

 

自定義的分區器將基於Person類中的姓計算哈希值,而後將這個哈希值對分區的數量進行模運算。在這裏,分區的數量就是reduce的數量:

 

1 public class PersonNamePartitioner extends Partitioner<Person, Text> {
2 
3     @Override
4     public int getPartition(Person key, Text value, int numPartitions) {
5         return Math.abs(key.getLastName().hashCode() * 127) % numPartitions;
6     }
7 
8 }

 

排序(SORTING)

Map端和reduce端都要進行排序。Map端排序的目的是讓reduce端的排序更加高效。這裏將讓MapReduce使用組合鍵的全部值進行排序,也就是基於姓氏和名字。

在下列例子中實現了WritableComparator。WritableComparator比較用戶的姓氏和名字。

 

 1 public class PersonComparator extends WritableComparator {
 2 
 3     protected PersonComparator() {
 4         super(Person.class, true);
 5     }
 6 
 7     @Override
 8     public int compare(WritableComparable w1, WritableComparable w2) {
 9     
10         Person p1 = (Person) w1;
11         Person p2 = (Person) w2;
12         
13         int cmp = p1.getLastName().compareTo(p2.getLastName());
14         
15         if (cmp != 0) {
16             return cmp;
17         }
18         
19         return p1.getFirstName().compareTo(p2.getFirstName());
20     }
21 }

 

分組(GROUPING)

當reduce階段將在本地磁盤上的map輸出的記錄進行流化處理(streaming)的時候,須要要進行分組。在分組中,記錄將被按必定方式排成一個有邏輯順序的流,並被傳輸給reduce。

在分組階段,全部的記錄已經通過了次排序。分組比較器須要將有相同姓氏的記錄分在同一個組。下面是分組比較器的實現:

 

 1 public class PersonNameComparator extends WritableComparator {
 2 
 3     protected PersonNameComparator() {
 4         super(Person.class, true);
 5     }
 6     
 7     @Override
 8     public int compare(WritableComparable o1, WritableComparable o2) {
 9         Person p1 = (Person) o1;
10         Person p2 = (Person) o2;
11         return p1.getLastName().compareTo(p2.getLastName());
12     }
13 }

 

MAPREDUCE

最後一步是告訴MapReduce使用自定義的分區器類,排序比較器類和分組比較器類:

 

1 job.setPartitionerClass(PersonNamePartitioner.class);
2 job.setSortComparatorClass(PersonComparator.class);
3 job.setGroupingComparatorClass(PersonNameComparator.class);

 

而後須要實現map和reduce代碼。Map類建立具備姓和名的組合鍵,而後將它做爲輸出鍵。將名字做爲輸出值。

Reduce類的輸出和輸入同樣:

 

 1 public static class Map extends Mapper<Text, Text, Person, Text> {
 2 
 3     private Person outputKey = new Person();
 4     
 5     @Override
 6     protected void map(Text lastName, Text firstName, Context context)
 7         throws IOException, InterruptedException {
 8         
 9         outputKey.set(lastName.toString(), firstName.toString());
10         context.write(outputKey, firstName);
11         
12     }
13 }
14 
15 public static class Reduce extends Reducer<Person, Text, Text, Text> {
16 
17     Text lastName = new Text();
18     
19     @Override
20     public void reduce(Person key, Iterable<Text> values, Context context)
21         throws IOException, InterruptedException {
22         
23         lastName.set(key.getLastName());
24         
25         for (Text firstName : values) {
26             context.write(lastName, firstName);
27         }
28     }
29 }

 

上傳一個包含了亂序的名字的小文件,並測試次排序是否可以生成已經根據名字排序好的結果:

 

$ hadoop fs -put test-data/ch4/usernames.txt .

$ hadoop fs -cat usernames.txt
Smith John
Smith Anne
Smith Ken

$ bin/run.sh com.manning.hip.ch4.sort.secondary.SortMapReduce usernames.txt output

$ hadoop fs -cat output/part*
Smith Anne
Smith John
Smith Ken

 

上面的結果和指望一致。

 

小結

這一節展現了MapReduce中如何使用次排序。下一部分介紹如何將多個reduce的結果作整體排序。

相關文章
相關標籤/搜索