用MapReduce進行數據密集型文本處理-本地聚合

本文譯自Working Through Data-Intensive Text Processing with MapReduce html

由於最近忙於 Coursera提供 的一些課程,我已經有一段時間沒有寫博客了。這些課程很是有意思,值得一看。我買了一本書《Data-Intensive Processing with MapReduce》,做者是Jimmy和Chris Dyer。書裏以僞碼形式總結了一些了MapReduce的重要算法。。我打算用真正的hadoop代碼來實現這本書中第3-6章中出現過的算法,以Tom White的《Hadoop經典指南》做爲參考。我假設本文的讀者已經瞭解Hadoop和MapReduce,因此本文再也不詳述基礎概念。讓咱們直接跳到第3章-MapReduce算法設計,從本地聚合開始。 git

本地聚合(Local Aggregation) github

從比較高的抽象層面上來說,mapper輸出數據的時候要先把中間結果寫到磁盤上,而後穿過網絡傳給reducer處理。對於一個mapreduce job來講,將數據寫磁盤以及以後的網絡傳輸的代價高昂,由於它們會大大增長延遲。因此,應該儘量減小mapper產生的數據量,這樣才能加快job的處理速度。本地聚合就是這樣一種減小中間數據量提升job效率的技術。本地聚合並不能代替reducer,由於reducer能夠彙集來自不一樣mapper的具備一樣key的數據。咱們有三種本地聚合的方法: 算法

1.使用Hadoop Combiner的功能 apache

2.Data-Intensive Processing with MapReduce這本書裏提到的兩種在mapper裏聚合的方法 api

固然任何優化都要考慮一些其餘因素,咱們將在後面討論這些。 網絡

爲了演示本地聚合,我在個人MacBookPro上用Cloudera的hadoop-0.20.2-cdh3u3搭建了了一個僞分佈集羣環境,咱們將用查爾斯狄更斯的小說《A Christmas Carol》來運行word count。我計劃之後在EC2上用更大的數據來作這個實驗。 app

Combiners

combiner功能由繼承了Reducer class的對象實現。事實上,在咱們的例子裏,咱們會重用word count中的reducer來做爲combiner。combiner 在配置MapReduce job的時候指定,就像這樣: 框架

1 job.setReducerClass(TokenCountReducer.class);

下面是reducer的代碼: 分佈式

01 public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
02     @Override
03     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throwsIOException, InterruptedException {
04         int count = 0;
05         for (IntWritable value : values) {
06               count+= value.get();
07         }
08         context.write(key,new IntWritable(count));
09     }
10 }

combiner的做用就如它的名字,聚合數據以儘可能減小shuffle階段的網絡傳輸量。如前所述,reducer仍然須要把來自不一樣mapper的一樣的key彙集起來。由於combiner功能只是對過程的一個優化,因此Hadoop框架不能保證combiner會被調用多少次。(配置了combinere就必定會執行,可是執行1次仍是n次是預先不肯定的)

在Mapper聚合的方法1

不用combiner的話,替代方法之一隻須要對咱們原來的word count mapper作一個小小的修改:

01 public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
02     @Override
03     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
04         IntWritable writableCount = new IntWritable();
05         Text text = new Text();
06         Map<String,Integer> tokenMap = new HashMap<String, Integer>();
07         StringTokenizer tokenizer = new StringTokenizer(value.toString());
08  
09         while(tokenizer.hasMoreElements()){
10             String token = tokenizer.nextToken();
11             Integer count = tokenMap.get(token);
12             if(count == null) count = new Integer(0);
13             count+=1;
14             tokenMap.put(token,count);
15         }
16  
17         Set<String> keys = tokenMap.keySet();
18         for (String s : keys) {
19              text.set(s);
20              writableCount.set(tokenMap.get(s));
21              context.write(text,writableCount);
22         }
23     }
24 }

如咱們所看到的,輸出的詞的計數再也不是1,咱們用一個map記錄處理過的每一個詞。處理完畢一行中的全部詞,而後遍歷這個map,輸出每一個詞在一行中的出現次數。

在Mapper聚合的方法2

mapper中聚合的第二種方法與上面的例子很是類似,但也有兩處不一樣 - 在何時創建hashmap以及何時輸出hashmap中的結果。在上面的例子裏,在每次調用map方法的時候建立map並在調用完成的時候輸出。在這個例子裏,咱們會把map做爲一個實例變量並在mapper的setUp方法裏初始化。一樣,map的內容要等到全部的map方法調用都完成以後,調用cleanUp方法的時候才輸出。

01 public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
02  
03     private  Map<String,Integer> tokenMap;
04  
05     @Override
06     protected void setup(Context context) throws IOException, InterruptedException {
07            tokenMap = new HashMap<String, Integer>();
08     }
09  
10     @Override
11     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
12         StringTokenizer tokenizer = new StringTokenizer(value.toString());
13         while(tokenizer.hasMoreElements()){
14             String token = tokenizer.nextToken();
15             Integer count = tokenMap.get(token);
16             if(count == null) count = new Integer(0);
17             count+=1;
18             tokenMap.put(token,count);
19         }
20     }
21  
22  
23     @Override
24     protected void cleanup(Context context) throws IOException, InterruptedException {
25         IntWritable writableCount = new IntWritable();
26         Text text = new Text();
27         Set<String> keys = tokenMap.keySet();
28         for (String s : keys) {
29             text.set(s);
30             writableCount.set(tokenMap.get(s));
31             context.write(text,writableCount);
32         }
33     }
34 }
正如上面的代碼所示,在   mapper裏,跨越全部map方法調用,記錄每一個詞的出現次數。經過這樣作,大大減小了發送到reducer的記錄數量,可以減小MapReduce任務的運行時間。達到的效果與使用MapReduce框架的combiner功能相同,可是這種狀況下你要本身保證你的聚合代碼是正確的。可是使用這種方法的時候要注意,在map方法調用過程當中始終保持狀態是有問題的,這有悖於「map」功能的原義。並且,在map調用過程當中保持狀態也須要關注你的內存使用。總之,根據不一樣狀況來作權衡,選擇最合適的辦法。

結果

如今讓咱們來看一下不一樣mapper的結果。由於job運行在僞分佈式模式下,這個運行時間不足以參考,不過咱們仍然能夠推斷出使用了本地聚合以後是如何影響真實集羣上運行的MapReduce job的效率的。

每一個詞輸出一次的Mapper:

1 12/09/13 21:25:32 INFO mapred.JobClient:     Reduce shuffle bytes=366010
2 12/09/13 21:25:32 INFO mapred.JobClient:     Reduce output records=7657
3 12/09/13 21:25:32 INFO mapred.JobClient:     Spilled Records=63118
4 12/09/13 21:25:32 INFO mapred.JobClient:     Map output bytes=302886

在mapper中聚合方法1:

1 12/09/13 21:28:15 INFO mapred.JobClient:     Reduce shuffle bytes=354112
2 12/09/13 21:28:15 INFO mapred.JobClient:     Reduce output records=7657
3 12/09/13 21:28:15 INFO mapred.JobClient:     Spilled Records=60704
4 12/09/13 21:28:15 INFO mapred.JobClient:     Map output bytes=293402

在mapper中聚合方法2:

1 12/09/13 21:30:49 INFO mapred.JobClient:     Reduce shuffle bytes=105885
2 12/09/13 21:30:49 INFO mapred.JobClient:     Reduce output records=7657
3 12/09/13 21:30:49 INFO mapred.JobClient:     Spilled Records=15314
4 12/09/13 21:30:49 INFO mapred.JobClient:     Map output bytes=90565

使用了Combiner:

1 12/09/13 21:22:18 INFO mapred.JobClient:     Reduce shuffle bytes=105885
2 12/09/13 21:22:18 INFO mapred.JobClient:     Reduce output records=7657
3 12/09/13 21:22:18 INFO mapred.JobClient:     Spilled Records=15314
4 12/09/13 21:22:18 INFO mapred.JobClient:     Map output bytes=302886
5 12/09/13 21:22:18 INFO mapred.JobClient:     Combine input records=31559
6 12/09/13 21:22:18 INFO mapred.JobClient:     Combine output records=7657

正如所料,沒有作任何聚合的Mapper效果最差,而後是「在mapper中聚合方法1」,差之了了。「在mapper中聚合方法2」與使用了combiner的結果很近似。比起前兩種方法,他們節省了2/3的shuffle字節數。這等於減小了一樣數量的網絡數據傳輸量,十分有利於提升MapReduce job的運行效率。不過要記住,方法2或者combiner並不必定可以應用於全部的MapReduce jobs word count很適合於這種場景,可是別的狀況可不必定。

結論

正如你看到的,使用mapper裏聚合方法和combiner是有好處的,不過當你在尋求提高MapReduce jobs的性能的時候你應該多考慮一些因素。至於選哪一種方法,這取決於你如何權衡。

本文是《Data Intensive Processing with MapReduce中的算法實現系列文章的最新一篇。該系列文章的第一篇在此 。在第一篇裏,咱們討論了使用本地聚合技術來減小shuffle階段的網絡傳輸數據量。減小須要傳輸的數據量是提升mapreduce job的性能的最有效的辦法。咱們在上一篇文章裏用了word count來演示本地聚合。由於咱們須要的只是一個最終統計結果,而在計算最終結果的過程當中改變累加的分組與順序都不會影響最終結果,因此咱們能夠重用reducer來做爲combiner。可是若是想計算平均值怎麼辦?這種狀況下原來的辦法就行不通了,由於整體的平均值不等於各部分平均值的平均。不過若是可以清楚的意識到這一點,咱們仍是可使用本地聚合方法的。在本文的例子中咱們將使用 在Hadoop經典指南》中出現過的 美國國家氣候中心的天氣數據  樣原本計算1901年每月的平均氣溫。使用combiner和mapper中聚合的計算平均值的算法可在《Data Intensive Processing with MapReduce》的3.1.3找到。

沒有放之四海而皆準的方法

咱們在上一篇文章裏介紹了兩種減小數據的方法,Hadoop Combiner和在mapper中聚合。Combiner被視爲是一個優化措施,所以框架不會保證它會被調用多少次。因此,mapper輸出的數據格式必須是符合reducer輸入格式的,以便在combiner根本沒有運行的狀況下最終結果仍是正確的。爲了計算平均氣溫,咱們須要改變一下mapper的輸出。

Mapper 的變化

在 word-count的例子裏,沒有優化的mapper輸出每一個詞和值爲1的計數。combiner和在mapper中聚合的方法經過一個hashmap,將每一個詞做爲key,出現次數做爲值,保存在hashmap中來減小輸出。若是combiner沒有調用,reducer將收到一系列key是單詞,值爲1的數據,這與以前的結果是同樣的。(若是使用在mapper中聚合的話就不會發生這種狀況,由於聚合是發生在mapper的代碼裏的,因此必定會被執行)。爲了計算平均值,咱們的mapper須要輸出一個字符串key(年月)和一個定製的實現了writable接口的對象, TemperatureAveragingPair。這個對象有兩個數字屬性,氣溫以及該氣溫的頻數。咱們能夠參考《Hadoop經典指南》中的 MaximumTemperatureMapper  來創建 AverageTemperatureMapper:
01 public class AverageTemperatureMapper extendsMapper<LongWritable, Text, Text, TemperatureAveragingPair> {
02  //sample line of weather data
03  //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999
04  
05  
06     private Text outText = new Text();
07     private TemperatureAveragingPair pair = new TemperatureAveragingPair();
08     private static final int MISSING = 9999;
09  
10     @Override
11     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
12         String line = value.toString();
13         String yearMonth = line.substring(15, 21);
14  
15         int tempStartPosition = 87;
16  
17         if (line.charAt(tempStartPosition) == '+') {
18             tempStartPosition += 1;
19         }
20  
21         int temp = Integer.parseInt(line.substring(tempStartPosition, 92));
22  
23         if (temp != MISSING) {
24             outText.set(yearMonth);
25             pair.set(temp, 1);
26             context.write(outText, pair);
27         }
28     }
29 }

經過讓mapper輸出key和TemperatureAveragingPair對象,無論combiner有沒有執行咱們的mapreduce程序都能輸出正確的結果。

Combiner

咱們要減小傳輸的數據量,就要儘量把相同氣溫的計數合併,可是又不能影響最終的平均數計算。當combiner執行的時候,它會把具備相同key的TemperatureAveragingPair 合併成一個,包含彙總的氣溫和頻度計數。下面是combiner的代碼:

01   public class AverageTemperatureCombiner extendsReducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> {
02     private TemperatureAveragingPair pair = new TemperatureAveragingPair();
03  
04     @Override
05     protected voidreduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throwsIOException, InterruptedException {
06         int temp = 0;
07         int count = 0;
08         for (TemperatureAveragingPair value : values) {
09              temp += value.getTemp().get();
10              count += value.getCount().get();
11         }
12         pair.set(temp,count);
13         context.write(key,pair);
14     }
15 }

可是咱們確實很關心如何減小須要傳輸給reducer的數據量,下面咱們將會看看如何實現這個目的。

在mapper中合併平均值

與word-count相同,爲了計算均值,在mapper中合併的方法會用到一個hashmap,它以年月爲key,以TemperatureAveragingPair爲值。合併相同年月的數據的時候咱們須要取出以該年月爲key的TemperatureAveragingPair對象,將temperature屬性和count屬性累加。最後在cleanUp方法被調用的時候會輸出hashmap中全部的key和TemperatureAveragingPair。

01 public class AverageTemperatureCombiningMapper extendsMapper<LongWritable, Text, Text, TemperatureAveragingPair> {
02  //sample line of weather data
03  //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999
04  
05  
06     private static final int MISSING = 9999;
07     private Map<String,TemperatureAveragingPair> pairMap = newHashMap<String,TemperatureAveragingPair>();
08  
09  
10     @Override
11     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
12         String line = value.toString();
13         String yearMonth = line.substring(15, 21);
14  
15         int tempStartPosition = 87;
16  
17         if (line.charAt(tempStartPosition) == '+') {
18             tempStartPosition += 1;
19         }
20  
21         int temp = Integer.parseInt(line.substring(tempStartPosition, 92));
22  
23         if (temp != MISSING) {
24             TemperatureAveragingPair pair = pairMap.get(yearMonth);
25             if(pair == null){
26                 pair = new TemperatureAveragingPair();
27                 pairMap.put(yearMonth,pair);
28             }
29             int temps = pair.getTemp().get() + temp;
30             int count = pair.getCount().get() + 1;
31             pair.set(temps,count);
32         }
33     }
34  
35  
36     @Override
37     protected void cleanup(Context context) throws IOException, InterruptedException {
38         Set<String> keys = pairMap.keySet();
39         Text keyText = new Text();
40         for (String key : keys) {
41              keyText.set(key);
42              context.write(keyText,pairMap.get(key));
43         }
44     }
45 }

用這種在mapper中合併的方法,咱們在屢次map調用之間保存了信息,確保了可以對產出數據進行削減。儘管保持跨mapper的狀態是一件須要當心的事情,但這在某些狀況下確實頗有效。

Reducer

在這種狀況reducer的邏輯就很簡單了,遍歷每一個key的全部值,把temperatures 和counts加和,而後相除。

01 public class AverageTemperatureReducer extendsReducer<Text, TemperatureAveragingPair, Text, IntWritable> {
02     private IntWritable average = new IntWritable();
03  
04     @Override
05     protected voidreduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throwsIOException, InterruptedException {
06         int temp = 0;
07         int count = 0;
08         for (TemperatureAveragingPair pair : values) {
09             temp += pair.getTemp().get();
10             count += pair.getCount().get();
11         }
12         average.set(temp / count);
13         context.write(key, average);
14     }
15 }

結果

正如預料,使用了combiner和mapper中合併方法的結果大幅減小了輸出數據。
沒有優化的狀況:

01 12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input groups=12
02 12/10/10 23:05:28 INFO mapred.JobClient:     Combine output records=0
03 12/10/10 23:05:28 INFO mapred.JobClient:     Map input records=6565
04 12/10/10 23:05:28 INFO mapred.JobClient:     Reduce shuffle bytes=111594
05 12/10/10 23:05:28 INFO mapred.JobClient:     Reduce output records=12
06 12/10/10 23:05:28 INFO mapred.JobClient:     Spilled Records=13128
07 12/10/10 23:05:28 INFO mapred.JobClient:     Map output bytes=98460
08 12/10/10 23:05:28 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
09 12/10/10 23:05:28 INFO mapred.JobClient:     Combine input records=0
10 12/10/10 23:05:28 INFO mapred.JobClient:     Map output records=6564
11 12/10/10 23:05:28 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12 12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input records=6564

使用了Combiner的狀況:

01 12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input groups=12
02 12/10/10 23:07:19 INFO mapred.JobClient:     Combine output records=12
03 12/10/10 23:07:19 INFO mapred.JobClient:     Map input records=6565
04 12/10/10 23:07:19 INFO mapred.JobClient:     Reduce shuffle bytes=210
05 12/10/10 23:07:19 INFO mapred.JobClient:     Reduce output records=12
06 12/10/10 23:07:19 INFO mapred.JobClient:     Spilled Records=24
07 12/10/10 23:07:19 INFO mapred.JobClient:     Map output bytes=98460
08 12/10/10 23:07:19 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
09 12/10/10 23:07:19 INFO mapred.JobClient:     Combine input records=6564
10 12/10/10 23:07:19 INFO mapred.JobClient:     Map output records=6564
11 12/10/10 23:07:19 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12 12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input records=12

在mapper中合併的狀況:

01 12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input groups=12
02 12/10/10 23:09:09 INFO mapred.JobClient:     Combine output records=0
03 12/10/10 23:09:09 INFO mapred.JobClient:     Map input records=6565
04 12/10/10 23:09:09 INFO mapred.JobClient:     Reduce shuffle bytes=210
05 12/10/10 23:09:09 INFO mapred.JobClient:     Reduce output records=12
06 12/10/10 23:09:09 INFO mapred.JobClient:     Spilled Records=24
07 12/10/10 23:09:09 INFO mapred.JobClient:     Map output bytes=180
08 12/10/10 23:09:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
09 12/10/10 23:09:09 INFO mapred.JobClient:     Combine input records=0
10 12/10/10 23:09:09 INFO mapred.JobClient:     Map output records=12
11 12/10/10 23:09:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12 12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input records=12

計算結果:
(注意: 例子裏使用的文件中的的溫度是攝氏度*10的結果)

Non-Optimized Combiner       In-Mapper-Combiner Mapper
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77

結論

咱們用兩種場景來演示了本地聚合,一個場景比較簡單隻要簡單重用reducer做爲combiner就能夠,另外一個稍微複雜一些,必須對數據作必定的組織,這兩種例子都充分證實了本地聚合可以極大提升處理過程的效率。

相關資源

相關文章
相關標籤/搜索