附錄以前總結的一個例子:html
http://www.cnblogs.com/DreamDrive/p/7398455.htmljava
另外兩個有價值的博文:apache
http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html數組
http://blog.csdn.net/heyutao007/article/details/5890103ide
在mapreduce操做時,shuffle階段會屢次根據key值排序。可是在shuffle分組後,相同key值的values序列的順序是不肯定的(以下圖)。若是想要此時value值也是排序好的,這種需求就是二次排序。函數
a 1 a 5 a 7 a 9 b 3 b 8 b 10 |
a 9 a 7 a 5 a 1 b 10 b 8 b 3 |
直接在reduce端對分組後的values進行排序。oop
reduce關鍵代碼測試
1 @Override 2 public void reduce(Text key, Iterable<IntWritable> values, Context context) 3 throws IOException, InterruptedException { 4 5 List<Integer> valuesList = new ArrayList<Integer>(); 6 7 // 取出value 8 for(IntWritable value : values) { 9 valuesList.add(value.get()); 10 } 11 // 進行排序 12 Collections.sort(valuesList); 13 14 for(Integer value : valuesList) { 15 context.write(key, new IntWritable(value)); 16 } 17 18 }
輸出結果:this
a 1 a 5 a 7 a 9 b 3 b 8 b 10
很容易發現,這樣把排序工做都放到reduce端完成,當values序列長度很是大的時候,會對CPU和內存形成極大的負載。spa
在reduce端對values進行迭代的時候,不要直接存儲value值或者key值,由於reduce方法會反覆執行屢次,但key和value相關的對象只有兩個,reduce會反覆重用這兩個對象。須要用相應的數據類型.get()取出後再存儲。
將map端輸出的<key,value>中的key和value組合成一個新的key(稱爲newKey),value值不變。這裏就變成<(key,value),value>,在針對newKey排序的時候,若是key相同,就再對value進行排序。
須要自定義的地方
1.自定義數據類型實現組合key
實現方式:繼承WritableComparable
2.自定義partioner,造成newKey後保持分區規則任然按照key進行。保證不打亂原來的分區。
實現方式:繼承partitioner
3.自定義分組,保持分組規則任然按照key進行。不打亂原來的分組
實現方式:繼承RawComparator
自定義數據類型關鍵代碼
1 import java.io.DataInput; 2 import java.io.DataOutput; 3 import java.io.IOException; 4 import org.apache.hadoop.io.WritableComparable; 5 6 public class PairWritable implements WritableComparable<PairWritable> { 7 // 組合key 8 private String first; 9 private int second; 10 11 public PairWritable() { 12 } 13 14 public PairWritable(String first, int second) { 15 this.set(first, second); 16 } 17 18 /** 19 * 方便設置字段 20 */ 21 public void set(String first, int second) { 22 this.first = first; 23 this.second = second; 24 } 25 26 /** 27 * 反序列化 28 */ 29 @Override 30 public void readFields(DataInput arg0) throws IOException { 31 this.first = arg0.readUTF(); 32 this.second = arg0.readInt(); 33 } 34 /** 35 * 序列化 36 */ 37 @Override 38 public void write(DataOutput arg0) throws IOException { 39 arg0.writeUTF(first); 40 arg0.writeInt(second); 41 } 42 43 /* 44 * 重寫比較器 45 */ 46 public int compareTo(PairWritable o) { 47 int comp = this.first.compareTo(o.first); 48 49 if(comp != 0) { 50 return comp; 51 } else { // 若第一個字段相等,則比較第二個字段 52 return Integer.valueOf(this.second).compareTo( 53 Integer.valueOf(o.getSecond())); 54 } 55 } 56 57 public int getSecond() { 58 return second; 59 } 60 public void setSecond(int second) { 61 this.second = second; 62 } 63 public String getFirst() { 64 return first; 65 } 66 public void setFirst(String first) { 67 this.first = first; 68 }
自定義分區規則
1 import org.apache.hadoop.io.IntWritable; 2 import org.apache.hadoop.mapreduce.Partitioner; 3 4 public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> { 5 6 @Override 7 public int getPartition(PairWritable key, IntWritable value, int numPartitions) { 8 /* 9 * 默認的實現 (key.hashCode() & Integer.MAX_VALUE) % numPartitions 10 * 讓key中first字段做爲分區依據 11 */ 12 return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; 13 } 14 }
自定義分組比較器
1 import org.apache.hadoop.io.RawComparator; 2 import org.apache.hadoop.io.WritableComparator; 3 4 public class SecondGroupComparator implements RawComparator<PairWritable> { 5 6 /* 7 * 對象比較 8 */ 9 public int compare(PairWritable o1, PairWritable o2) { 10 return o1.getFirst().compareTo(o2.getFirst()); 11 } 12 13 /* 14 * 字節比較 15 * arg0,arg3爲要比較的兩個字節數組 16 * arg1,arg2表示第一個字節數組要進行比較的收尾位置,arg4,arg5表示第二個 17 * 從第一個字節比到組合key中second的前一個字節,由於second爲int型,因此長度爲4 18 */ 19 public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { 20 return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4); 21 } 22 }
map關鍵代碼
1 private PairWritable mapOutKey = new PairWritable(); 2 private IntWritable mapOutValue = new IntWritable(); 3 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 4 String lineValue = value.toString(); 5 String[] strs = lineValue.split("\t"); 6 7 //設置組合key和value ==> <(key,value),value> 8 mapOutKey.set(strs[0], Integer.valueOf(strs[1])); 9 mapOutValue.set(Integer.valueOf(strs[1])); 10 11 context.write(mapOutKey, mapOutValue); 12 }
reduce關鍵代碼
1 private Text outPutKey = new Text(); 2 public void reduce(PairWritable key, Iterable<IntWritable> values, Context context) 3 throws IOException, InterruptedException { 4 //迭代輸出 5 for(IntWritable value : values) { 6 outPutKey.set(key.getFirst()); 7 context.write(outPutKey, value); 8 } 9 10 }
輸出結果:
a 1 a 5 a 7 a 9 b 3 b 8 b 10
在map階段:
使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時InputFormat提供一個RecordReder的實現。
本例子中使用的是TextInputFormat,他提供的RecordReder會將文本的一行的行號做爲key,這一行的文本做爲value。這就是自定義Map的輸入是<LongWritable, Text>的緣由。
而後調用自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。
在map階段的最後,會先調用job.setPartitionerClass對這個List進行分區,每一個分區映射到一個reducer。每一個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。
能夠看到,這自己就是一個二次排序。若是沒有經過job.setSortComparatorClass設置key比較函數類,則使用key的實現的compareTo方法。
在reduce階段:
reducer接收到全部映射到這個reducer的map輸出後,也是會調用job.setSortComparatorClass設置的key比較函數類對全部數據對排序。
而後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設置的分組函數類。
只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的全部key的第一個key。
最後就是進入Reducer的reduce方法,reduce方法的輸入是全部的(key和它的value迭代器)。一樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。