MapReduce 算法 - 反轉排序 (Order Inversion)

本文另外一地址請見MapReduce算法-反轉排序 算法

本文譯自 MapReduce Algorithms - Order Inversion排序 app

譯者注:在剛開始翻譯的時候,我將Order Inversion按照字面意思翻譯成「反序」或者「倒序」,可是翻譯完整篇文章以後,我感受到,將Order Inversion翻譯成反序模式是不恰當的,根據本文的內容,很顯然,Inversion並不是是將順序倒排的意思,而是如同Spring的IOC同樣,代表的是一種控制權的反轉。Spring將對象的實例化責任從業務代碼反轉給了框架,而在本文的模式中,在mapreduce的sorting過程當中,原來由框架負責的數據的排序以及shuffle規則被用戶定製化了,控制權從框架反轉到了user,實際上這種模式就是由用戶控制sorting過程的意思
框架

本文是一系列有關MapReduce算法的文章中的一篇,這些算法都在《Data-Intensive Text Processing with MapReduce》中提到過。這系列文章在本文以前已經發表的有 本地聚合本地聚合二 和 創建共生矩陣。在這篇文章裏咱們要討論的是排序反轉模式。這種模式利用MapReduce的排序(sorting)階段,讓一部分數據提早發送到reducer端以利於後續計算,若是你對MapReduce瞭解很少,我勸你讀下去,由於我將展現給你如何使用排序(sorting)和partitioner來實現咱們的目的,這將會大有益處。 ide

儘管已經有許多MapReduce框架提供了高層次的抽象,例如Hive和Pig,理解底層是如何運行的仍然是有好處的。反序模式出如今《Data-Intensive Text Processing with MapReduce》這本書的第三章, 爲了說明反序模式,咱們要用共生矩陣模式中出現過的配對方法。創建共生矩陣的時候咱們能夠記錄下詞共同出現的次數,我門會對配對方法作一個小小的修改,mapper不止輸出諸如(「foo」,」bar」) 這樣的詞對,還會額外輸出(「foo」,」*」)這樣的詞對,對於每一個詞都依此法辦理,這樣能夠很容易的得出左邊的這個詞的總共出現次數,用這個就能夠計算出相對頻率。這種方法會帶來兩個問題,首先咱們須要想辦法保證讓 (「foo」,」*」) 成爲reducer 的第一條記錄,其次咱們要保證左邊的詞相同的全部的詞對都被同一個reducer所處理,咱們先來看mapper代碼再解決這兩個問題。 工具


Mapper Code

首先咱們要對mapper作一些有別於配對方法的修改。在每次循環的最後,輸出了某個詞的全部的詞對以後,輸出一個特殊的詞對(「word」,」*」), 計數就是這個詞做爲左邊詞的詞對出現的次數。 this

01 public class PairsRelativeOccurrenceMapper extendsMapper<LongWritable, Text, WordPair, IntWritable> {
02     private WordPair wordPair = new WordPair();
03     private IntWritable ONE = new IntWritable(1);
04     private IntWritable totalCount = new IntWritable();
05  
06     @Override
07     protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {
08         int neighbors = context.getConfiguration().getInt('neighbors', 2);
09         String[] tokens = value.toString().split('\\s+');
10         if (tokens.length > 1) {
11             for (int i = 0; i < tokens.length; i++) {
12                     tokens[i] = tokens[i].replaceAll('\\W+','');
13  
14                     if(tokens[i].equals('')){
15                         continue;
16                     }
17  
18                     wordPair.setWord(tokens[i]);
19  
20                     int start = (i - neighbors < 0) ? 0 : i - neighbors;
21                     int end = (i + neighbors >= tokens.length) ? tokens.length - 1 : i + neighbors;
22                     for (int j = start; j <= end; j++) {
23                         if (j == i) continue;
24                         wordPair.setNeighbor(tokens[j].replaceAll('\\W',''));
25                         context.write(wordPair, ONE);
26                     }
27                     wordPair.setNeighbor('*');
28                     totalCount.set(end - start);
29                     context.write(wordPair, totalCount);
30             }
31         }
32     }
33 }

如今咱們找到了統計特定詞出現次數的辦法,咱們還須要想辦法讓這個特定的詞對稱爲reduce處理的第一條記錄以便計算相對頻度。咱們能夠經過修改WordPair對象的compareTo方法MapReduce 的sorting階段來實現這個目的。 spa

修改排序

修改WordPair類的compareTo方法,讓發現 「*」 爲右詞的對象排到前列。 .net

01 @Override
02 public int compareTo(WordPair other) {
03     int returnVal = this.word.compareTo(other.getWord());
04     if(returnVal != 0){
05         return returnVal;
06     }
07     if(this.neighbor.toString().equals('*')){
08         return -1;
09     }else if(other.getNeighbor().toString().equals('*')){
10         return 1;
11     }
12     return this.neighbor.compareTo(other.getNeighbor());
13 }

經過修改compareTo方法,咱們能夠保證含有特殊字符的WordPair 都排在比較靠前的位置並會首先被reducer處理。這引出了第二個問題,咱們怎樣使具備相同左詞的全部WordPai對象被髮送到同一個reducer? 答案是定製一個partitioner。 翻譯

定製 Partitioner

用key的hashcode對reducer數取模,就把key分配到了不一樣的reducer,這就是shuffle過程。但咱們的WordPair 對象包含2個詞,計算整個對象的hashcode是行不通的。咱們須要寫一個本身的Partitioner, 它在選擇將輸出發送到哪一個reducer的時候只考慮左邊的詞。 code

1 public class WordPairPartitioner extends Partitioner<WordPair,IntWritable> {
2  
3     @Override
4     public int getPartition(WordPair wordPair, IntWritable intWritable, int numPartitions) {
5         return wordPair.getWord().hashCode() % numPartitions;
6     }
7 }

Reducer

寫一個reducer來實現倒序模式很簡單。引入一個計數變量以及一個表示當前詞的「current」變量。reducer會檢查做爲輸入key的WordPair 右邊是否是特殊字符「*」。假如左邊的詞不等於「current」表示的詞就重置計數變量,而且計算current表示的詞的總次數。而後處理下一個WordPair對象,在同一個current範圍內,計數之和與各個不一樣右詞的計數結合就能夠獲得相對頻率。繼續這個過程直到發現另外一個詞(左詞)而後再從新開始。

01 public class PairsRelativeOccurrenceReducer extendsReducer<WordPair, IntWritable, WordPair, DoubleWritable> {
02     private DoubleWritable totalCount = new DoubleWritable();
03     private DoubleWritable relativeCount = new DoubleWritable();
04     private Text currentWord = new Text('NOT_SET');
05     private Text flag = new Text('*');
06  
07     @Override
08     protected void reduce(WordPair key, Iterable<IntWritable> values, Context context) throwsIOException, InterruptedException {
09         if (key.getNeighbor().equals(flag)) {
10             if (key.getWord().equals(currentWord)) {
11                 totalCount.set(totalCount.get() + getTotalCount(values));
12             } else {
13                 currentWord.set(key.getWord());
14                 totalCount.set(0);
15                 totalCount.set(getTotalCount(values));
16             }
17         } else {
18             int count = getTotalCount(values);
19             relativeCount.set((double) count / totalCount.get());
20             context.write(key, relativeCount);
21         }
22     }
23   private int getTotalCount(Iterable<IntWritable> values) {
24         int count = 0;
25         for (IntWritable value : values) {
26             count += value.get();
27         }
28         return count;
29     }
30 }

經過控制sort階段的邏輯和創建定製partitioner,咱們能夠把執行計算的reducer須要的數據在計算所需的數據到達以前發送到reducer,雖然這裏沒有展現,不過combiner在MapReduce中是常常會用到的。並且這個方法(使用combiner)也是mapper端合併模式的的一個很是好的實現。

如今咱們能夠保證有着相同左詞的全部WordPair對象都被髮到了同一個reducer。剩下的就是創建一個reducer來使用發送到reducer的數據。

例子和結果

在假期的這段時間裏,我用查爾斯狄更斯的小說《聖誕頌歌》做爲樣例來運行了一下反序模式。我知道這可能沒什麼實際意義,但咱們的目的就是這樣。

01 new-host-2:sbin bbejeck$ hdfs dfs -cat relative/part* | grep Humbug
02 {word=[Humbug] neighbor=[Scrooge]}  0.2222222222222222
03 {word=[Humbug] neighbor=[creation]} 0.1111111111111111
04 {word=[Humbug] neighbor=[own]}  0.1111111111111111
05 {word=[Humbug] neighbor=[said]} 0.2222222222222222
06 {word=[Humbug] neighbor=[say]}  0.1111111111111111
07 {word=[Humbug] neighbor=[to]}   0.1111111111111111
08 {word=[Humbug] neighbor=[with]} 0.1111111111111111
09 {word=[Scrooge] neighbor=[Humbug]}  0.0020833333333333333
10 {word=[creation] neighbor=[Humbug]} 0.1
11 {word=[own] neighbor=[Humbug]}  0.006097560975609756
12 {word=[said] neighbor=[Humbug]} 0.0026246719160104987
13 {word=[say] neighbor=[Humbug]}  0.010526315789473684
14 {word=[to] neighbor=[Humbug]}   3.97456279809221E-4
15 {word=[with] neighbor=[Humbug]} 9.372071227741331E-4

結論

即便在工做中計算相對詞頻的需求可能並不常見,咱們也可以用這個來展現sorting和定製partitioner的用法,這但是咱們寫 MapReduce 程序時候的得力工具。如前所述,即便你的MapReduce都是用像Hive和Pig這樣的高層次抽象語言寫成的,瞭解一些底層的機制仍然是有好處的,謝謝。

相關文章
相關標籤/搜索