[大牛翻譯系列]Hadoop(1)MapReduce 鏈接:重分區鏈接(Repartition join)

4.1 鏈接(Join)

鏈接是關係運算,能夠用於合併關係(relation)。對於數據庫中的錶鏈接操做,可能已經廣爲人知了。在MapReduce中,鏈接能夠用於合併兩個或多個數據集。例如,用戶基本信息和用戶活動詳情信息。用戶基本信息來自於OLTP數據庫。用戶活動詳情信息來自於日誌文件。html

MapReduce的鏈接操做能夠用於如下場景:數據庫

  • 用戶的人口統計信息的聚合操做(例如:青少年和中年人的習慣差別)。
  • 當用戶超過必定時間沒有使用網站後,發郵件提醒他們。(這個必定時間的閾值是用戶本身預約義的)
  • 分析用戶的瀏覽習慣。讓系統能夠基於這個分析提示用戶有哪些網站特性尚未使用到。進而造成一個反饋循環。

全部這些場景都要求將多個數據集鏈接起來。apache

最經常使用的兩個鏈接類型是內鏈接(inner join)和外鏈接(outer join)。以下圖所示,內鏈接比較兩個關係中全部的元組,判斷是否知足鏈接條件,而後生成一個知足鏈接條件的結果集。與內鏈接相反的是,外鏈接並不須要兩個關係的元組都知足鏈接條件。在鏈接條件不知足的時候,外鏈接能夠將其中一方的數據保留在結果集中。數組

 

 

爲了實現內鏈接和外鏈接,MapReduce中有三種鏈接策略,以下所示。這三種鏈接策略有的在map階段,有的在reduce階段。它們都針對MapReduce的排序-合併(sort-merge)的架構進行了優化。緩存

  1. 重分區鏈接(Repartition join)—— reduce端鏈接。使用場景:鏈接兩個或多個大型數據集。
  2. 複製鏈接(Replication join)—— map端鏈接。使用場景:待鏈接的數據集中有一個數據集足夠小到能夠徹底放在緩存中。
  3. 半鏈接(Semi-join)—— 另外一個map端鏈接。使用場景:待鏈接的數據集中有一個數據集很是大,但同時這個數據集能夠被過濾成小到能夠放在緩存中。

在介紹完這些鏈接策略以後,還會介紹另外一個策略:決策樹。能夠根據實際狀況選擇最優策略。網絡

 

4.1.1 重分區鏈接(Repartition join)

重分區鏈接是reduce端鏈接。它利用MapReduce的排序-合併機制來分組數據。它只使用一個單獨的MapReduce任務,並支持多路鏈接(N-way join)。多路指的是多個數據集。架構

Map階段負責從多個數據集中讀取數據,決定每一個數據的鏈接值,將鏈接值做爲輸出鍵(output key)。輸出值(output value)則包含將在reduce階段被合併的值。app

Reduce階段,一個reduce接收map函數傳來的每個輸出鍵的全部輸出值,並將數據分爲多個分區。在此以後,reduce對全部的分區進行笛卡爾積(Cartersian product)鏈接運算,並生成所有的結果集。框架

以上MapReduce過程如圖4.2所示:ide

 

 

注:過濾(filtering)和投影(projection)

在MapReduce重分區鏈接中,最好可以減小map階段傳輸到reduce階段的數據量。由於經過網絡在這兩個階段中排序和傳輸數據會產生很高的成本。若是不能避免reduce端的工做,那麼一個最佳實踐就是儘量在map階段多過濾數據和投影。過濾指的是將map極端的輸入數據中不須要的部分丟棄。投影是關係代數的概念。投影用於減小發送給reduce的字段。例如:在分析用戶數據的時候,若是隻關注用戶的年齡,那麼在map任務中應該只投影(或輸出)年齡字段,不考慮用戶的其餘的字段。

 

技術19:優化重分區鏈接

《Hadoop in Action》給出了一個例子,說明如何使用Hadoop的社區包(contrib package)org.apache.hadoop.contrib.utils.join實現重分區鏈接。這個貢獻包打包了全部的處理細節,僅僅須要實現一個很是簡單的方法。

然而,這個社區包對重分區的實現方法的空間效率低下。它須要將待鏈接的全部輸出值都讀取到內存中,而後進行多路鏈接(multiway join)。實際上,若是僅僅將小數據集讀取到內存中,而後用小數據集遍歷大數據集來進行鏈接,這樣將更加高效。

 

問題

須要在MapReduce中進行重分區鏈接,可是不但願在reduce階段將全部的數據都放到緩存中。

 

解決方案

這個技術運用了優化後的重分區框架。它僅僅將一個待鏈接的數據集放在緩存中,減小了reduce須要放在緩存中的數據。

 

討論

附錄D.1(http://www.cnblogs.com/datacloud/p/3617079.html)中介紹了優化後的重分區框架的實現。這個實現是根據org.apache.hadoop.contrib.utils.join社區包進行建模。這個優化後的框架僅僅緩存兩個數據集中比較小的那一個,以減小內存消耗。圖4.3是優化後的重分區鏈接的流程圖:

 

 

 

圖4.4是實現的類圖。類圖中包含兩個部分,一個通用框架和一些類的實現樣例。

 

 

使用這個鏈接框架須要實現抽象類OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。

例如,須要鏈接用戶詳情數據和用戶活動日誌。第一步,判斷兩個數據集中那一個比較小。對於通常的網站來講,用戶詳情數據會比較小,用戶活動日誌會比較大。

在以下示例中,用戶數據中有用戶姓名,年齡和所在州

 

$ cat test-data/ch4/users.txt
anne 22 NY
joe 39 CO
alison 35 NY
mike 69 VA
marie 27 OR
jim 21 OR
bob 71 CA
mary 53 NY
dave 36 VA
dude 50 CA

 

用戶活動日誌中有用戶姓名,進行的動做,來源IP。這個文件通常都要比用戶數據要大得多。

 

$ cat test-data/ch4/user-logs.txt
jim logout 93.24.237.12
mike new_tweet 87.124.79.252
bob new_tweet 58.133.120.100
mike logout 55.237.104.36
jim new_tweet 93.24.237.12
marie view_user 122.158.130.90

 

首先,必須實現抽象類OptimizedDataJoinMapperBase。這個將在map端被調用。這個類將建立map的輸出鍵和輸出值。同時,它還將提示整個框架,當前處理的文件是否是比較小的那個。

 

 1 public class SampleMap extends OptimizedDataJoinMapperBase {
 2 
 3   private boolean smaller;
 4 
 5   @Override
 6   protected Text generateInputTag(String inputFile) {
 7     // tag the row with input file name (data source)
 8     smaller = inputFile.contains("users.txt");
 9     return new Text(inputFile);
10   }
11 
12   @Override
13   protected String genGroupKey(Object key, OutputValue output) {
14     return key.toString();
15   }
16 
17   @Override
18   protected boolean isInputSmaller(String inputFile) {
19     return smaller;
20   }
21 
22   @Override
23   protected OutputValue genMapOutputValue(Object o) {
24     return new TextTaggedOutputValue((Text) o);
25   }
26 }

 

下一步,你須要實現抽象類 OptimizedDataJoinReducerBase。它將在reduce端被調用。在這個類中,將從map端傳入不一樣數據集的輸出鍵和輸出值,而後返回reduce端的輸出數組。

 

 1 public class SampleReduce extends OptimizedDataJoinReducerBase {
 2 
 3   private TextTaggedOutputValue output = new TextTaggedOutputValue();
 4   private Text textOutput = new Text();
 5 
 6   @Override
 7   protected OutputValue combine(String key,
 8                                 OutputValue smallValue,
 9                                 OutputValue largeValue) {
10     if(smallValue == null || largeValue == null) {
11       return null;
12     }
13     Object[] values = {
14         smallValue.getData(), largeValue.getData()
15     };
16     textOutput.set(StringUtils.join(values, "\t"));
17     output.setData(textOutput);
18     return output;
19   }

 

最後,任務的主代碼(driver code)須要指明InputFormat類,並設置次排序(Secondary sort)。

 

 1     job.setInputFormat(KeyValueTextInputFormat.class);
 2 
 3     job.setMapOutputKeyClass(CompositeKey.class);
 4     job.setMapOutputValueClass(TextTaggedOutputValue.class);
 5     job.setOutputKeyClass(Text.class);
 6     job.setOutputValueClass(Text.class);
 7 
 8     job.setPartitionerClass(CompositeKeyPartitioner.class);
 9     job.setOutputKeyComparatorClass(CompositeKeyComparator.class);
10     job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class);

 

如今鏈接的準備工做就作完了,能夠開始運行鏈接:

 

$ hadoop fs -put test-data/ch4/users.txt users.txt
$ hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt
$ bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain users.txt,user-logs.txt output
$ hadoop fs -cat output/part*
bob 71 CA new_tweet 58.133.120.100
jim 21 OR logout 93.24.237.12
jim 21 OR new_tweet 93.24.237.12
jim 21 OR login 198.184.237.49
marie 27 OR login 58.133.120.100
marie 27 OR view_user 122.158.130.90
mike 69 VA new_tweet 87.124.79.252
mike 69 VA logout 55.237.104.36

 

若是和鏈接的源文件相對比,能夠看到由於實現了一個內鏈接,輸出中不包括用戶anne,alison等不存在於日誌文件中的記錄。

 

小結:

這個鏈接的實現經過只緩存比較小的數據集來提升來Hadoop社區包的效率。可是,當數據從map階段傳輸到reduce階段的時候,仍然產生了很高的網絡成本。

此外,Hadoop社區包支持多路鏈接,這裏的實現只支持二路鏈接。

若是要更多地減小reduce端鏈接的內存足跡(memory footprint),一個簡單的機制是在map函數中更多地進行投影操做。投影減小了map階段的輸出中的字段。例如:在分析用戶數據的時候,若是隻關注用戶的年齡,那麼在map任務中應該只投影(或輸出)年齡字段,不考慮用戶的其餘的字段。這樣就減小了map和reduce之間的網絡負擔,也減小了reduce在鏈接時的內存消耗。

和原始的社區包同樣,這裏的重分區的實現也支持過濾和投影。經過容許genMapOutputValue方法返回空值,就能夠支持過濾。經過在genMapOutputValue方法中定義輸出值的內容,就能夠支持投影。

若是你既想輸出全部的數據到reduce,又想避免排序的損耗,就須要考慮另外兩種鏈接策略,複製鏈接和半鏈接。

相關文章
相關標籤/搜索