洗牌和排序階段都很耗費資源。洗牌須要在map和reduce任務之間傳輸數據,會致使過大的網絡消耗。排序和合並操做的消耗也是很顯著的。這一節將介紹一系列的技術來緩解洗牌和排序階段的消耗。數組
技術46 規避使用reduce網絡
Reduce在用於鏈接數據集的時候將會產生大量的網絡消耗。ide
問題函數
須要考慮在MapReduce規避reduce的使用。工具
方案oop
經過將MapReduce參數setNumReduceTasks設置爲0來建立一個只有map的做業。學習
討論優化
洗牌和排序階段通常都是用來鏈接數據集。但鏈接操做並不必定須要洗牌和排序,正如第4章中所介紹的。知足必定條件的鏈接能夠只在map端運行。那麼就只須要只有map的做業了。設置只有map的做業的命令以下。this
job.setNumReduceTasks(0);
小結spa
一個只有map的做業的OutputFormat是和普通做業中reduce的OutputFormat同樣。如圖6.39所示。
若是沒法規避reduce,那麼就要儘可能減少它對你的做業執行時間的影響。
技術47 過濾和投影
Map到Reduce之間傳輸數據要經過網絡,這個成本很高。
問題
須要減小被洗牌的數據。
方案
減小map輸出的每條記錄的大小,並儘量地減小map輸出的數據量。
討論
過濾和投影是關係運算中的概念,用以減小須要處理的數據。這些概念也能夠用到MapReduce中減小map任務須要輸出的數據。如下是過濾和投影的簡明定義:
如下是上述概念的演示代碼:
1 Text outputKey = new Text(); 2 Text outputValue = new Text(); 3 4 @Override 5 public void map(LongWritable key, Text value, 6 OutputCollector<Text, Text> output, 7 Reporter reporter) throws IOException { 8 9 String v = value.toString(); 10 11 if (!v.startsWith("10.")) { 12 String[] parts = StringUtils.split(v, ".", 3); 13 outputKey.set(parts[0]); 14 outputValue.set(parts[1]); 15 output.collect(outputKey, outputValue); 16 } 17 }
小結
過濾和投影是在須要顯著減小MapReduce做業運行時間時最容易的方法中的兩種。
若是已經應用了這兩種方法,但還須要進一步減小運行時間。那麼就能夠考慮combine。
技術48 使用combine
Combine能夠在map階段進行聚合操做來減小須要發送到reduce的數據。它是一個map端的優化工具,以map的輸出做爲輸入。
問題
須要在過濾和投影后進一步減小運行時間。
方案
定義一個combine。在做業代碼中使用setCombinerClass來調用它。
討論
在map輸出數據到磁盤的過程當中,有兩個子過程:溢灑(spill)子過程,合併子過程。Combine在這兩個子過程當中都會被調用,如圖6.40所示。爲了讓combine在分組數據中效率最大,能夠在兩個子過程調用combine以前進行初步(precursory)的排序。
與設置map類相似,做業使用setCombinClass來設置combine。
job.setCombinerClass(Combine.class);
Combine的實現必須嚴格聽從reduce的規格說明。這裏將假定使用技術39種的map。將map的輸出中的記錄按照下述條件合併:第二個八進制數相同。代碼以下。
1 public static class Combine implements Reducer<Text, Text, Text, Text> { 2 3 @Override 4 public void reduce(Text key, Iterator<Text> values, 5 OutputCollector<Text, 6 Text> output, 7 Reporter reporter) throws IOException { 8 9 Text prev = null; 10 while (values.hasNext()) { 11 Text t = values.next(); 12 if (!t.equals(prev)) { 13 output.collect(key, t); 14 } 15 prev = ReflectionUtils.copy(job, t, prev); 16 } 17 } 18 }
Combine函數必須是可分佈的(distributive)。如圖6.40(在前面)所示,combine要被調用屢次處理多個具備相同輸入鍵的記錄。這些記錄的順序是不可預測的。可分佈函數是指,不論輸入數據的順序如何,最終的結果都同樣。
小結
在MapReduce中combine很是有用,它可以減小map和reduce之間的網絡傳輸數據和網絡負載。下一個減小執行時間的有用工具就是二進制比較器。
技術49 用Comparator進行超快排序
MapReduce默認使用RawComparator對map的輸出鍵進行比較排序。內置的Writable類(例如Text和IntWritable)是字節級實現。這樣不用將字節形式的類解排列(unmarshal)成類對象。若是要經過WritableComparable實現自定義Writable,就有可能延長洗牌和排序階段的時間,由於它須要進行解排列。
問題
存在自定義的Writable。須要減小做業的排序時間。
方案
實現字節級的Comparator來優化排序中的比較過程。
討論
在MapReduce中不少階段,排序是經過比較輸出鍵來進行的。爲了加快鍵排序,全部的map輸出鍵必須實現WritableComparable接口。
1 public interface WritableComparable<T> extends Writable, Comparable<T> { 2 3 }
若是對4.2.1中的Person類進行改造,實現代碼以下。
1 public class Person implements WritableComparable<Person> { 2 private String firstName; 3 private String lastName; 4 5 @Override 6 public int compareTo(Person other) { 7 int cmp = this.lastName.compareTo(other.lastName); 8 if (cmp != 0) { 9 return cmp; 10 } 11 return this.firstName.compareTo(other.firstName); 12 } 13 ...
這個Comparator的問題在於,若是要進行比較,就須要將字節形式的map的中間結果數據解排列成Writable形式。解排列要從新建立對象,所以成本很高。
Hadoop中的自帶的各類Writable類不但擴展了WritableComparable接口,也提供了基於WritableComparator類的自定義Comparator。代碼以下。
1 public class WritableComparator implements RawComparator { 2 3 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 4 5 try { 6 buffer.reset(b1, s1, l1); 7 key1.readFields(buffer); 8 9 buffer.reset(b2, s2, l2); 10 key2.readFields(buffer); 11 } catch (IOException e) { 12 throw new RuntimeException(e); 13 } 14 return compare(key1, key2); 15 } 16 17 /** Compare two WritableComparables. 18 * 19 * <p> The default implementation uses the natural ordering, 20 * calling {@link 21 * Comparable#compareTo(Object)}. */ 22 @SuppressWarnings("unchecked") 23 public int compare(WritableComparable a, WritableComparable b) { 24 return a.compareTo(b); 25 } 26 ... 27 }
要實現字節級的Comparator,須要重載compare方法。這裏先學習一下IntWritable類如何實現這個方法。
1 public class IntWritable implements WritableComparable { 2 3 public static class Comparator extends WritableComparator { 4 5 public Comparator() { 6 super(IntWritable.class); 7 } 8 9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 10 int thisValue = readInt(b1, s1); 11 int thatValue = readInt(b2, s2); 12 return (thisValue<thatValue ? -1 : 13 (thisValue==thatValue ? 0 : 1)); 14 } 15 } 16 17 static { 18 WritableComparator.define(IntWritable.class, new Comparator()); 19 }
若是隻使用內置的Writable,那就沒有必要實現WritableComparator。它們都自帶。若是須要使用自定義的Writable做爲輸出鍵,那麼就須要自定義WritableComparator。這裏基於前述Person類來講明如何實現。
在Person類中,有兩個字符串類屬性,firstName和lastName。使用writeUTF方法經過DataOutput輸出它們。如下是實現代碼。
1 private String firstName; 2 private String lastName; 3 4 @Override 5 public void write(DataOutput out) throws IOException { 6 out.writeUTF(lastName); 7 out.writeUTF(firstName); 8 }
首先須要理解Person對象是如何用字節形式表示的。writeUTF方法輸出了字節長度(2個字節),字符內容(字符的長度,L1個字節)。如圖6.41描述了字節是如何排列的。
假設須要對lastName和firstName進行字典式地比較(譯註:就是看字典中的前後順序)。顯然不能直接用整個字節數組,由於其中還有字符長度。那麼Comparator就須要足夠聰明到可以跳過字符長度。如下是實現代碼。
1 @Override 2 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 3 4 int lastNameResult = compare(b1, s1, b2, s2); 5 if (lastNameResult != 0) { 6 return lastNameResult; 7 } 8 int b1l1 = readUnsignedShort(b1, s1); 9 int b2l1 = readUnsignedShort(b2, s2); 10 return compare(b1, s1 + b1l1 + 2, b2, s2 + b2l1 + 2); 11 } 12 13 public static int compare(byte[] b1, int s1, byte[] b2, int s2) { 14 int b1l1 = readUnsignedShort(b1, s1); 15 int b2l1 = readUnsignedShort(b2, s2); 16 return compareBytes(b1, s1 + 2, b1l1, b2, s2 + 2, b2l1); 17 } 18 19 public static int readUnsignedShort(byte[] b, int offset) { 20 int ch1 = b[offset]; 21 int ch2 = b[offset + 1]; 22 return (ch1 << 8) + (ch2); 23 }
小結
writeUTF只支持小於65536字符的字符串類。對於人名來講,是足夠了。大點的,可能就不行。這個時候就須要使用Hadoop的Text類來支持更大的字符串。Text類中的Comparator類的二進制字符串比較器的實現機制和剛纔介紹的大體至關。(這個修飾真長。)那麼針對Text類的lastName和firstName的Comparator的實現方式也會累死。
下一節將介紹如何減少數據傾斜的影響。