在MapReduce中,排序的目的有兩個:算法
例如:須要瞭解前N個最受歡迎的用戶或網頁的數據分析工做。app
在這一節中,有兩個場景須要對MapReduce的排序行爲進行優化。ide
次排序能夠根據reduce的鍵對它的值進行排序。若是要求一些數據先於另一些數據到達reduce,次排序就頗有用。(這一章在講解優化過的重分區鏈接中也提到了這樣的場景。)另外一個場景中,須要將做業的輸出根據兩個鍵進行排序,一個鍵的優先級高於另一個鍵(secondary key)。這個場景也能夠用到次排序。例如:將股票數據先根據股票標誌進行主排序(primary sort),而後根據股票配額進行次排序。本書不少技術中將會運用次排序,如重分區鏈接的優化,朋友圖算法等。函數
這一節第二部分中,將探討對reduce的輸出的所有數據進行整體排序。這在分析數據集中的前N個元素或後N個元素時會比較有用。oop
在前一節(MapReduce鏈接)中,次排序用於使一部分數據先於另一部分到達reduce。做爲基礎知識,學習次排序前須要瞭解MapReduce中的數據整理和數據流。圖4.12說明了三個影響數據整理和數據流(分區,排序,分組)的元素,而且說明了這些元素如何整合到MapReduce中。學習
在map輸出收集(output collection)階段,由分區器(Partitioner)選擇哪一個reduce應該接收map的輸出。map輸出的各個分區的數據,由RawComparator進行排序。Reduce端也用RawComparator進行排序。而後,由RawComparator對排序好的數據進行分組。測試
技術21 實現次排序優化
對於某個map的鍵的全部值,若是須要其中一部分值先於另一部分值到達reduce,就能夠用到次排序。次排序還用在了本書的第7章中的朋友圖算法,和通過優化的重分區排序中。this
問題spa
在發送給某個reduce的數據中,須要對某個天然鍵(natural key)的值進行排序。
方案
這個技術中將應用到自定義分區類,排序比較類(sort comparator),分組比較類(grouping comparator)。這些是實現次排序的基礎。
討論
在這個技術中,使用次排序來對人的名字進行排序。具體步驟是:先用主排序對人的姓排序,再用次排序對人的名字排序。
次排序須要在map函數中生成組合鍵(composite key)做爲輸出鍵。
組合輸出鍵包括兩個部分:
圖4.13說明了組合鍵的構成。它還包括了一個用於reduce端的組合值(composite value)。組合值讓reduce能夠訪問次鍵。
在介紹了組合鍵類以後,接下來具體說明分區,排序和分組階段以及他們的實現。
組合鍵(COMPOSITE KEY)
組合鍵包括姓氏和名字。它擴展了WritableComparable。WritableComparable被推薦用於map函數輸出鍵的Writable類。
1 public class Person implements WritableComparable<Person> { 2 3 private String firstName; 4 private String lastName; 5 6 @Override 7 public void readFields(DataInput in) throws IOException { 8 this.firstName = in.readUTF(); 9 this.lastName = in.readUTF(); 10 } 11 12 @Override 13 public void write(DataOutput out) throws IOException { 14 out.writeUTF(firstName); 15 out.writeUTF(lastName); 16 } 17 ...
圖4.14說明了分區,排序和分組的類的名字和方法的設置。同時還有各個類如何使用組合鍵。
接下來是對其它類的實現代碼的介紹。
分區器(PARTITIONER)
分區器用來決定map的輸出值應該分配到哪一個reduce。MapReduce的默認分區器(HashPartitioner)調用輸出鍵的hashCode方法,而後用hashCode方法的結果對reduce的數量進行一個模數(modulo)運算,最後獲得那個目標reduce。默認的分區器使用整個鍵。這就不適於組合鍵了。由於它可能把有一樣天然鍵的組合鍵發送給不一樣的reduce。所以,就須要自定義分區器,基於天然鍵進行分區。
如下代碼實現了分區器的接口。getPartition方法的輸入參數有key,value和分區的數量:
1 public interface Partitioner<K2, V2> extends JobConfigurable { 2 int getPartition(K2 key, V2 value, int numPartitions); 3 }
自定義的分區器將基於Person類中的姓計算哈希值,而後將這個哈希值對分區的數量進行模運算。在這裏,分區的數量就是reduce的數量:
1 public class PersonNamePartitioner extends Partitioner<Person, Text> { 2 3 @Override 4 public int getPartition(Person key, Text value, int numPartitions) { 5 return Math.abs(key.getLastName().hashCode() * 127) % numPartitions; 6 } 7 8 }
排序(SORTING)
Map端和reduce端都要進行排序。Map端排序的目的是讓reduce端的排序更加高效。這裏將讓MapReduce使用組合鍵的全部值進行排序,也就是基於姓氏和名字。
在下列例子中實現了WritableComparator。WritableComparator比較用戶的姓氏和名字。
1 public class PersonComparator extends WritableComparator { 2 3 protected PersonComparator() { 4 super(Person.class, true); 5 } 6 7 @Override 8 public int compare(WritableComparable w1, WritableComparable w2) { 9 10 Person p1 = (Person) w1; 11 Person p2 = (Person) w2; 12 13 int cmp = p1.getLastName().compareTo(p2.getLastName()); 14 15 if (cmp != 0) { 16 return cmp; 17 } 18 19 return p1.getFirstName().compareTo(p2.getFirstName()); 20 } 21 }
分組(GROUPING)
當reduce階段將在本地磁盤上的map輸出的記錄進行流化處理(streaming)的時候,須要要進行分組。在分組中,記錄將被按必定方式排成一個有邏輯順序的流,並被傳輸給reduce。
在分組階段,全部的記錄已經通過了次排序。分組比較器須要將有相同姓氏的記錄分在同一個組。下面是分組比較器的實現:
1 public class PersonNameComparator extends WritableComparator { 2 3 protected PersonNameComparator() { 4 super(Person.class, true); 5 } 6 7 @Override 8 public int compare(WritableComparable o1, WritableComparable o2) { 9 Person p1 = (Person) o1; 10 Person p2 = (Person) o2; 11 return p1.getLastName().compareTo(p2.getLastName()); 12 } 13 }
MAPREDUCE
最後一步是告訴MapReduce使用自定義的分區器類,排序比較器類和分組比較器類:
1 job.setPartitionerClass(PersonNamePartitioner.class); 2 job.setSortComparatorClass(PersonComparator.class); 3 job.setGroupingComparatorClass(PersonNameComparator.class);
而後須要實現map和reduce代碼。Map類建立具備姓和名的組合鍵,而後將它做爲輸出鍵。將名字做爲輸出值。
Reduce類的輸出和輸入同樣:
1 public static class Map extends Mapper<Text, Text, Person, Text> { 2 3 private Person outputKey = new Person(); 4 5 @Override 6 protected void map(Text lastName, Text firstName, Context context) 7 throws IOException, InterruptedException { 8 9 outputKey.set(lastName.toString(), firstName.toString()); 10 context.write(outputKey, firstName); 11 12 } 13 } 14 15 public static class Reduce extends Reducer<Person, Text, Text, Text> { 16 17 Text lastName = new Text(); 18 19 @Override 20 public void reduce(Person key, Iterable<Text> values, Context context) 21 throws IOException, InterruptedException { 22 23 lastName.set(key.getLastName()); 24 25 for (Text firstName : values) { 26 context.write(lastName, firstName); 27 } 28 } 29 }
上傳一個包含了亂序的名字的小文件,並測試次排序是否可以生成已經根據名字排序好的結果:
$ hadoop fs -put test-data/ch4/usernames.txt . $ hadoop fs -cat usernames.txt Smith John Smith Anne Smith Ken $ bin/run.sh com.manning.hip.ch4.sort.secondary.SortMapReduce usernames.txt output $ hadoop fs -cat output/part* Smith Anne Smith John Smith Ken
上面的結果和指望一致。
小結
這一節展現了MapReduce中如何使用次排序。下一部分介紹如何將多個reduce的結果作整體排序。