[大牛翻譯系列]Hadoop(13)MapReduce 性能調優:優化洗牌(shuffle)和排序階段

6.4.3 優化洗牌(shuffle)和排序階段

洗牌和排序階段都很耗費資源。洗牌須要在map和reduce任務之間傳輸數據,會致使過大的網絡消耗。排序和合並操做的消耗也是很顯著的。這一節將介紹一系列的技術來緩解洗牌和排序階段的消耗。數組

 

技術46 規避使用reduce網絡

Reduce在用於鏈接數據集的時候將會產生大量的網絡消耗。ide

 

問題函數

須要考慮在MapReduce規避reduce的使用。工具


方案oop

經過將MapReduce參數setNumReduceTasks設置爲0來建立一個只有map的做業。學習


討論優化

洗牌和排序階段通常都是用來鏈接數據集。但鏈接操做並不必定須要洗牌和排序,正如第4章中所介紹的。知足必定條件的鏈接能夠只在map端運行。那麼就只須要只有map的做業了。設置只有map的做業的命令以下。this

 

job.setNumReduceTasks(0);


小結spa

一個只有map的做業的OutputFormat是和普通做業中reduce的OutputFormat同樣。如圖6.39所示。

 

 

若是沒法規避reduce,那麼就要儘可能減少它對你的做業執行時間的影響。

 

技術47 過濾和投影

Map到Reduce之間傳輸數據要經過網絡,這個成本很高。


問題

須要減小被洗牌的數據。


方案

減小map輸出的每條記錄的大小,並儘量地減小map輸出的數據量。


討論

過濾和投影是關係運算中的概念,用以減小須要處理的數據。這些概念也能夠用到MapReduce中減小map任務須要輸出的數據。如下是過濾和投影的簡明定義:

  • 過濾是減小map輸出的數據量。
  • 投影是減小map輸出的每條記錄的大小。

如下是上述概念的演示代碼:

 

 1 Text outputKey = new Text();
 2 Text outputValue = new Text();
 3 
 4 @Override
 5 public void map(LongWritable key, Text value,
 6                 OutputCollector<Text, Text> output,
 7                 Reporter reporter) throws IOException {
 8                 
 9     String v = value.toString();
10     
11     if (!v.startsWith("10.")) {
12         String[] parts = StringUtils.split(v, ".", 3);
13         outputKey.set(parts[0]);
14         outputValue.set(parts[1]);
15         output.collect(outputKey, outputValue);
16     }
17 }


小結

過濾和投影是在須要顯著減小MapReduce做業運行時間時最容易的方法中的兩種。

若是已經應用了這兩種方法,但還須要進一步減小運行時間。那麼就能夠考慮combine。

 

技術48 使用combine

Combine能夠在map階段進行聚合操做來減小須要發送到reduce的數據。它是一個map端的優化工具,以map的輸出做爲輸入。


問題

須要在過濾和投影后進一步減小運行時間。


方案

定義一個combine。在做業代碼中使用setCombinerClass來調用它。


討論

在map輸出數據到磁盤的過程當中,有兩個子過程:溢灑(spill)子過程,合併子過程。Combine在這兩個子過程當中都會被調用,如圖6.40所示。爲了讓combine在分組數據中效率最大,能夠在兩個子過程調用combine以前進行初步(precursory)的排序。

 

 

與設置map類相似,做業使用setCombinClass來設置combine。

 

job.setCombinerClass(Combine.class);

 

Combine的實現必須嚴格聽從reduce的規格說明。這裏將假定使用技術39種的map。將map的輸出中的記錄按照下述條件合併:第二個八進制數相同。代碼以下。

 

 1 public static class Combine implements Reducer<Text, Text, Text, Text> {
 2     
 3     @Override
 4     public void reduce(Text key, Iterator<Text> values,
 5                         OutputCollector<Text,
 6                         Text> output,
 7                         Reporter reporter) throws IOException {
 8                         
 9         Text prev = null;
10         while (values.hasNext()) {
11             Text t = values.next();
12             if (!t.equals(prev)) {
13                 output.collect(key, t);
14             }
15             prev = ReflectionUtils.copy(job, t, prev);
16         }
17     }
18 }

 

Combine函數必須是可分佈的(distributive)。如圖6.40(在前面)所示,combine要被調用屢次處理多個具備相同輸入鍵的記錄。這些記錄的順序是不可預測的。可分佈函數是指,不論輸入數據的順序如何,最終的結果都同樣。


小結

在MapReduce中combine很是有用,它可以減小map和reduce之間的網絡傳輸數據和網絡負載。下一個減小執行時間的有用工具就是二進制比較器。

 

技術49 用Comparator進行超快排序

MapReduce默認使用RawComparator對map的輸出鍵進行比較排序。內置的Writable類(例如Text和IntWritable)是字節級實現。這樣不用將字節形式的類解排列(unmarshal)成類對象。若是要經過WritableComparable實現自定義Writable,就有可能延長洗牌和排序階段的時間,由於它須要進行解排列。


問題

存在自定義的Writable。須要減小做業的排序時間。


方案

實現字節級的Comparator來優化排序中的比較過程。

 

討論

在MapReduce中不少階段,排序是經過比較輸出鍵來進行的。爲了加快鍵排序,全部的map輸出鍵必須實現WritableComparable接口。

 

1 public interface WritableComparable<T> extends Writable, Comparable<T> {
2 
3 }

 

若是對4.2.1中的Person類進行改造,實現代碼以下。

 

 1 public class Person implements WritableComparable<Person> {
 2     private String firstName;
 3     private String lastName;
 4     
 5     @Override
 6     public int compareTo(Person other) {
 7         int cmp = this.lastName.compareTo(other.lastName);
 8         if (cmp != 0) {
 9             return cmp;
10         }
11         return this.firstName.compareTo(other.firstName);
12     }
13 ...

 

這個Comparator的問題在於,若是要進行比較,就須要將字節形式的map的中間結果數據解排列成Writable形式。解排列要從新建立對象,所以成本很高。

Hadoop中的自帶的各類Writable類不但擴展了WritableComparable接口,也提供了基於WritableComparator類的自定義Comparator。代碼以下。

 

 1 public class WritableComparator implements RawComparator {
 2 
 3     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 4     
 5         try {
 6             buffer.reset(b1, s1, l1);
 7             key1.readFields(buffer);
 8 
 9             buffer.reset(b2, s2, l2);
10             key2.readFields(buffer);
11         } catch (IOException e) {
12             throw new RuntimeException(e);
13         }
14         return compare(key1, key2);
15     }
16     
17     /** Compare two WritableComparables.
18     *
19     * <p> The default implementation uses the natural ordering,
20     * calling {@link
21     * Comparable#compareTo(Object)}. */
22     @SuppressWarnings("unchecked")
23     public int compare(WritableComparable a, WritableComparable b) {
24         return a.compareTo(b);
25     }
26     ...
27 }

 

要實現字節級的Comparator,須要重載compare方法。這裏先學習一下IntWritable類如何實現這個方法。

 

 1 public class IntWritable implements WritableComparable {
 2 
 3     public static class Comparator extends WritableComparator {
 4     
 5         public Comparator() {
 6             super(IntWritable.class);
 7         }
 8         
 9         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
10             int thisValue = readInt(b1, s1);
11             int thatValue = readInt(b2, s2);
12             return (thisValue<thatValue ? -1 :
13             (thisValue==thatValue ? 0 : 1));
14         }
15     }
16     
17     static {
18         WritableComparator.define(IntWritable.class, new Comparator());
19     }

 

若是隻使用內置的Writable,那就沒有必要實現WritableComparator。它們都自帶。若是須要使用自定義的Writable做爲輸出鍵,那麼就須要自定義WritableComparator。這裏基於前述Person類來講明如何實現。

在Person類中,有兩個字符串類屬性,firstName和lastName。使用writeUTF方法經過DataOutput輸出它們。如下是實現代碼。

 

1 private String firstName;
2 private String lastName;
3 
4 @Override
5 public void write(DataOutput out) throws IOException {
6     out.writeUTF(lastName);
7     out.writeUTF(firstName);
8 }

 

首先須要理解Person對象是如何用字節形式表示的。writeUTF方法輸出了字節長度(2個字節),字符內容(字符的長度,L1個字節)。如圖6.41描述了字節是如何排列的。

 

 

假設須要對lastName和firstName進行字典式地比較(譯註:就是看字典中的前後順序)。顯然不能直接用整個字節數組,由於其中還有字符長度。那麼Comparator就須要足夠聰明到可以跳過字符長度。如下是實現代碼。

 

 1 @Override
 2 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 3 
 4     int lastNameResult = compare(b1, s1, b2, s2);
 5     if (lastNameResult != 0) {
 6         return lastNameResult;
 7     }
 8     int b1l1 = readUnsignedShort(b1, s1);
 9     int b2l1 = readUnsignedShort(b2, s2);
10     return compare(b1, s1 + b1l1 + 2, b2, s2 + b2l1 + 2);
11 }
12 
13 public static int compare(byte[] b1, int s1, byte[] b2, int s2) {
14     int b1l1 = readUnsignedShort(b1, s1);
15     int b2l1 = readUnsignedShort(b2, s2);
16     return compareBytes(b1, s1 + 2, b1l1, b2, s2 + 2, b2l1);
17 }
18 
19 public static int readUnsignedShort(byte[] b, int offset) {
20     int ch1 = b[offset];
21     int ch2 = b[offset + 1];
22     return (ch1 << 8) + (ch2);
23 }

 

小結

 writeUTF只支持小於65536字符的字符串類。對於人名來講,是足夠了。大點的,可能就不行。這個時候就須要使用Hadoop的Text類來支持更大的字符串。Text類中的Comparator類的二進制字符串比較器的實現機制和剛纔介紹的大體至關。(這個修飾真長。)那麼針對Text類的lastName和firstName的Comparator的實現方式也會累死。

下一節將介紹如何減少數據傾斜的影響。

相關文章
相關標籤/搜索