本文另外一地址請見MapReduce算法-反轉排序 算法
本文譯自 MapReduce Algorithms - Order Inversion排序 app
譯者注:在剛開始翻譯的時候,我將Order Inversion按照字面意思翻譯成「反序」或者「倒序」,可是翻譯完整篇文章以後,我感受到,將Order Inversion翻譯成反序模式是不恰當的,根據本文的內容,很顯然,Inversion並不是是將順序倒排的意思,而是如同Spring的IOC同樣,代表的是一種控制權的反轉。Spring將對象的實例化責任從業務代碼反轉給了框架,而在本文的模式中,在mapreduce的sorting過程當中,原來由框架負責的數據的排序以及shuffle規則被用戶定製化了,控制權從框架反轉到了user,實際上這種模式就是由用戶控制sorting過程的意思
框架
本文是一系列有關MapReduce算法的文章中的一篇,這些算法都在《Data-Intensive Text Processing with MapReduce》中提到過。這系列文章在本文以前已經發表的有 本地聚合, 本地聚合二 和 創建共生矩陣。在這篇文章裏咱們要討論的是排序反轉模式。這種模式利用MapReduce的排序(sorting)階段,讓一部分數據提早發送到reducer端以利於後續計算,若是你對MapReduce瞭解很少,我勸你讀下去,由於我將展現給你如何使用排序(sorting)和partitioner來實現咱們的目的,這將會大有益處。 ide
儘管已經有許多MapReduce框架提供了高層次的抽象,例如Hive和Pig,理解底層是如何運行的仍然是有好處的。反序模式出如今《Data-Intensive Text Processing with MapReduce》這本書的第三章, 爲了說明反序模式,咱們要用共生矩陣模式中出現過的配對方法。創建共生矩陣的時候咱們能夠記錄下詞共同出現的次數,我門會對配對方法作一個小小的修改,mapper不止輸出諸如(「foo」,」bar」) 這樣的詞對,還會額外輸出(「foo」,」*」)這樣的詞對,對於每一個詞都依此法辦理,這樣能夠很容易的得出左邊的這個詞的總共出現次數,用這個就能夠計算出相對頻率。這種方法會帶來兩個問題,首先咱們須要想辦法保證讓 (「foo」,」*」) 成爲reducer 的第一條記錄,其次咱們要保證左邊的詞相同的全部的詞對都被同一個reducer所處理,咱們先來看mapper代碼再解決這兩個問題。 工具
Mapper Code
首先咱們要對mapper作一些有別於配對方法的修改。在每次循環的最後,輸出了某個詞的全部的詞對以後,輸出一個特殊的詞對(「word」,」*」), 計數就是這個詞做爲左邊詞的詞對出現的次數。 this
01 |
public class PairsRelativeOccurrenceMapper extendsMapper<LongWritable, Text, WordPair, IntWritable> { |
02 |
private WordPair wordPair = new WordPair(); |
03 |
private IntWritable ONE = new IntWritable(1); |
04 |
private IntWritable totalCount = new IntWritable(); |
07 |
protected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException { |
08 |
int neighbors = context.getConfiguration().getInt('neighbors', 2); |
09 |
String[] tokens = value.toString().split('\\s+'); |
10 |
if (tokens.length > 1) { |
11 |
for (int i = 0; i < tokens.length; i++) { |
12 |
tokens[i] = tokens[i].replaceAll('\\W+',''); |
14 |
if(tokens[i].equals('')){ |
18 |
wordPair.setWord(tokens[i]); |
20 |
int start = (i - neighbors < 0) ? 0 : i - neighbors; |
21 |
int end = (i + neighbors >= tokens.length) ? tokens.length - 1 : i + neighbors; |
22 |
for (int j = start; j <= end; j++) { |
24 |
wordPair.setNeighbor(tokens[j].replaceAll('\\W','')); |
25 |
context.write(wordPair, ONE); |
27 |
wordPair.setNeighbor('*'); |
28 |
totalCount.set(end - start); |
29 |
context.write(wordPair, totalCount); |
如今咱們找到了統計特定詞出現次數的辦法,咱們還須要想辦法讓這個特定的詞對稱爲reduce處理的第一條記錄以便計算相對頻度。咱們能夠經過修改WordPair對象的compareTo方法在MapReduce 的sorting階段來實現這個目的。 spa
修改排序
修改WordPair類的compareTo方法,讓發現 「*」 爲右詞的對象排到前列。 .net
02 |
public int compareTo(WordPair other) { |
03 |
int returnVal = this.word.compareTo(other.getWord()); |
07 |
if(this.neighbor.toString().equals('*')){ |
09 |
}else if(other.getNeighbor().toString().equals('*')){ |
12 |
return this.neighbor.compareTo(other.getNeighbor()); |
經過修改compareTo方法,咱們能夠保證含有特殊字符的WordPair 都排在比較靠前的位置並會首先被reducer處理。這引出了第二個問題,咱們怎樣使具備相同左詞的全部WordPai對象被髮送到同一個reducer? 答案是定製一個partitioner。 翻譯
定製 Partitioner
用key的hashcode對reducer數取模,就把key分配到了不一樣的reducer,這就是shuffle過程。但咱們的WordPair 對象包含2個詞,計算整個對象的hashcode是行不通的。咱們須要寫一個本身的Partitioner, 它在選擇將輸出發送到哪一個reducer的時候只考慮左邊的詞。 code
1 |
public class WordPairPartitioner extends Partitioner<WordPair,IntWritable> { |
4 |
public int getPartition(WordPair wordPair, IntWritable intWritable, int numPartitions) { |
5 |
return wordPair.getWord().hashCode() % numPartitions; |
Reducer
寫一個reducer來實現倒序模式很簡單。引入一個計數變量以及一個表示當前詞的「current」變量。reducer會檢查做爲輸入key的WordPair 右邊是否是特殊字符「*」。假如左邊的詞不等於「current」表示的詞就重置計數變量,而且計算current表示的詞的總次數。而後處理下一個WordPair對象,在同一個current範圍內,計數之和與各個不一樣右詞的計數結合就能夠獲得相對頻率。繼續這個過程直到發現另外一個詞(左詞)而後再從新開始。
01 |
public class PairsRelativeOccurrenceReducer extendsReducer<WordPair, IntWritable, WordPair, DoubleWritable> { |
02 |
private DoubleWritable totalCount = new DoubleWritable(); |
03 |
private DoubleWritable relativeCount = new DoubleWritable(); |
04 |
private Text currentWord = new Text('NOT_SET'); |
05 |
private Text flag = new Text('*'); |
08 |
protected void reduce(WordPair key, Iterable<IntWritable> values, Context context) throwsIOException, InterruptedException { |
09 |
if (key.getNeighbor().equals(flag)) { |
10 |
if (key.getWord().equals(currentWord)) { |
11 |
totalCount.set(totalCount.get() + getTotalCount(values)); |
13 |
currentWord.set(key.getWord()); |
15 |
totalCount.set(getTotalCount(values)); |
18 |
int count = getTotalCount(values); |
19 |
relativeCount.set((double) count / totalCount.get()); |
20 |
context.write(key, relativeCount); |
23 |
private int getTotalCount(Iterable<IntWritable> values) { |
25 |
for (IntWritable value : values) { |
經過控制sort階段的邏輯和創建定製partitioner,咱們能夠把執行計算的reducer須要的數據在計算所需的數據到達以前發送到reducer,雖然這裏沒有展現,不過combiner在MapReduce中是常常會用到的。並且這個方法(使用combiner)也是mapper端合併模式的的一個很是好的實現。
如今咱們能夠保證有着相同左詞的全部WordPair對象都被髮到了同一個reducer。剩下的就是創建一個reducer來使用發送到reducer的數據。
例子和結果
在假期的這段時間裏,我用查爾斯狄更斯的小說《聖誕頌歌》做爲樣例來運行了一下反序模式。我知道這可能沒什麼實際意義,但咱們的目的就是這樣。
01 |
new-host-2:sbin bbejeck$ hdfs dfs -cat relative/part* | grep Humbug |
02 |
{word=[Humbug] neighbor=[Scrooge]} 0.2222222222222222 |
03 |
{word=[Humbug] neighbor=[creation]} 0.1111111111111111 |
04 |
{word=[Humbug] neighbor=[own]} 0.1111111111111111 |
05 |
{word=[Humbug] neighbor=[said]} 0.2222222222222222 |
06 |
{word=[Humbug] neighbor=[say]} 0.1111111111111111 |
07 |
{word=[Humbug] neighbor=[to]} 0.1111111111111111 |
08 |
{word=[Humbug] neighbor=[with]} 0.1111111111111111 |
09 |
{word=[Scrooge] neighbor=[Humbug]} 0.0020833333333333333 |
10 |
{word=[creation] neighbor=[Humbug]} 0.1 |
11 |
{word=[own] neighbor=[Humbug]} 0.006097560975609756 |
12 |
{word=[said] neighbor=[Humbug]} 0.0026246719160104987 |
13 |
{word=[say] neighbor=[Humbug]} 0.010526315789473684 |
14 |
{word=[to] neighbor=[Humbug]} 3.97456279809221E-4 |
15 |
{word=[with] neighbor=[Humbug]} 9.372071227741331E-4 |
結論
即便在工做中計算相對詞頻的需求可能並不常見,咱們也可以用這個來展現sorting和定製partitioner的用法,這但是咱們寫 MapReduce 程序時候的得力工具。如前所述,即便你的MapReduce都是用像Hive和Pig這樣的高層次抽象語言寫成的,瞭解一些底層的機制仍然是有好處的,謝謝。