Hadoop之MapReduce自定義二次排序流程實例詳解

1.如何解決MapReduce二次排序?
2.Map端如何處理?
3.Reduce端如何處理?
4.MapReduce二次排序是如何具體實現的呢?

1、概述
MapReduce框架對處理結果的輸出會根據key值進行默認的排序,這個默認排序能夠知足一部分需求,可是也是十分有限的。在咱們實際的需求當中,每每有要對reduce輸出結果進行二次排序的需求。對於二次排序的實現,網絡上已經有不少人分享過了,可是對二次排序的實現的原理以及整個MapReduce框架的處理流程的分析仍是有很是大的出入,並且部分分析是沒有通過驗證的。本文將經過一個實際的MapReduce二次排序例子,講述二次排序的實現和其MapReduce的整個處理流程,而且經過結果和map、reduce端的日誌來驗證所描述的處理流程的正確性。
2、需求描述
一、輸入數據:
sort1    1
sort2    3
sort2    77
sort2    54
sort1    2
sort6    22
sort6    221
sort6    20
二、目標輸出
sort1 1,2
sort2 3,54,77
sort6 20,22,221

3、解決思路
  一、首先,在思考解決問題思路時,咱們先應該深入的理解MapReduce處理數據的整個流程,這是最基礎的,否則的話是不可能找到解決問題的思路的。我描述一下MapReduce處理數據的大概簡單流程:首先,MapReduce框架經過getSplit方法實現對原始文件的切片以後,每個切片對應着一個map task,inputSplit輸入到Map函數進行處理,中間結果通過環形緩衝區的排序,而後分區、自定義二次排序(若是有的話)和合並,再經過shuffle操做將數據傳輸到reduce task端,reduce端也存在着緩衝區,數據也會在緩衝區和磁盤中進行合併排序等操做,而後對數據按照Key值進行分組,而後沒處理完一個分組以後就會去調用一次reduce函數,最終輸出結果。大概流程我畫了一下,以下圖:java

二、具體解決思路

(1)Map端處理:

  根據上面的需求,咱們有一個很是明確的目標就是要對第一列相同的記錄合併,而且對合並後的數字進行排序。咱們都知道MapReduce框架不論是默認排序或者是自定義排序都只是對Key值進行排序,如今的狀況是這些數據不是key值,怎麼辦?其實咱們能夠將原始數據的Key值和其對應的數據組合成一個新的Key值,而後新的Key值對應的仍是以前的數字。那麼咱們就能夠將原始數據的map輸出變成相似下面的數據結構:
{[sort1,1],1}
{[sort2,3],3}
{[sort2,77],77}
{[sort2,54],54}
{[sort1,2],2}
{[sort6,22],22}
{[sort6,221],221}
{[sort6,20],20}
那麼咱們只須要對[]裏面的新key值進行排序就ok了。而後咱們須要自定義一個分區處理器,由於個人目標不是想將新key相同的傳到同一個reduce中,而是想將新key中的第一個字段相同的才放到同一個reduce中進行分組合並,因此咱們須要根據新key值中的第一個字段來自定義一個分區處理器。經過分區操做後,獲得的數據流以下:
Partition1:{[sort1,1],1}、{[sort1,2],2}
Partition2:{[sort2,3],3}、{[sort2,77],77}、{[sort2,54],54}
Partition3:{[sort6,22],22}、{[sort6,221],221}、{[sort6,20],20}

分區操做完成以後,我調用本身的自定義排序器對新的Key值進行排序。
{[sort1,1],1}
{[sort1,2],2}
{[sort2,3],3}
{[sort2,54],54}
{[sort2,77],77}
{[sort6,20],20}
{[sort6,22],22}
{[sort6,221],221}

(2)Reduce端處理:
  通過Shuffle處理以後,數據傳輸到Reducer端了。在Reducer端對按照組合鍵的第一個字段來進行分組,而且沒處理完一次分組以後就會調用一次reduce函數來對這個分組進行處理輸出。最終的各個分組的數據結構變成相似下面的數據結構:
{sort1,[1,2]}
{sort2,[3,54,77]}
{sort6,[20,22,221]}

4、具體實現
一、自定義組合鍵
package com.mr; 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import org.apache.Hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
/** 
* 自定義組合鍵 
* @author zenghzhaozheng 
*/
public class CombinationKey implements WritableComparable<CombinationKey>{ 
    private static final Logger logger = LoggerFactory.getLogger(CombinationKey.class); 
    private Text firstKey; 
    private IntWritable secondKey; 
    public CombinationKey() { 
        this.firstKey = new Text(); 
        this.secondKey = new IntWritable(); 
    } 
    public Text getFirstKey() { 
        return this.firstKey; 
    } 
    public void setFirstKey(Text firstKey) { 
        this.firstKey = firstKey; 
    } 
    public IntWritable getSecondKey() { 
        return this.secondKey; 
    } 
    public void setSecondKey(IntWritable secondKey) { 
        this.secondKey = secondKey; 
    } 
    @Override
    public void readFields(DataInput dateInput) throws IOException { 
        // TODO Auto-generated method stub 
        this.firstKey.readFields(dateInput); 
        this.secondKey.readFields(dateInput); 
    } 
    @Override
    public void write(DataOutput outPut) throws IOException { 
        this.firstKey.write(outPut); 
        this.secondKey.write(outPut); 
    } 
    /** 
    * 自定義比較策略 
    * 注意:該比較策略用於mapreduce的第一次默認排序,也就是發生在map階段的sort小階段, 
    * 發生地點爲環形緩衝區(能夠經過io.sort.mb進行大小調整) 
    */
    @Override
    public int compareTo(CombinationKey combinationKey) { 
        logger.info("-------CombinationKey flag-------"); 
        return this.firstKey.compareTo(combinationKey.getFirstKey()); 
    } 
}
說明:在自定義組合鍵的時候,咱們須要特別注意,必定要實現WritableComparable接口,而且實現compareTo方法的比較策略。這個用於mapreduce的第一次默認排序,也就是發生在map階段的sort小階段,發生地點爲環形緩衝區(能夠經過io.sort.mb進行大小調整),可是其對咱們最終的二次排序結果是沒有影響的。咱們二次排序的最終結果是由咱們的自定義比較器決定的。
二、自定義分區器
package com.mr; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
/** 
* 自定義分區 
* @author zengzhaozheng 
*/
public class DefinedPartition extends Partitioner<CombinationKey,IntWritable>{ 
    private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class); 
    /** 
    *  數據輸入來源:map輸出 
    * @author zengzhaozheng 
    * @param key map輸出鍵值 
    * @param value map輸出value值 
    * @param numPartitions 分區總數,即reduce task個數 
    */
    @Override
    public int getPartition(CombinationKey key, IntWritable value,int numPartitions) { 
        logger.info("--------enter DefinedPartition flag--------"); 
        /** 
        * 注意:這裏採用默認的hash分區實現方法 
        * 根據組合鍵的第一個值做爲分區 
        * 這裏須要說明一下,若是不自定義分區的話,mapreduce框架會根據默認的hash分區方法, 
        * 將整個組合將相等的分到一個分區中,這樣的話顯然不是咱們要的效果 
        */
        logger.info("--------out DefinedPartition flag--------"); 
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; 
    } 
}
說明:具體說明看代碼註釋。

三、自定義比較器
package com.mr; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
/** 
* 自定義二次排序策略 
* @author zengzhaoheng 
*/
public class DefinedComparator extends WritableComparator { 
    private static final Logger logger = LoggerFactory.getLogger(DefinedComparator.class); 
    public DefinedComparator() { 
        super(CombinationKey.class,true); 
    } 
    @Override
    public int compare(WritableComparable combinationKeyOne, 
            WritableComparable CombinationKeyOther) { 
        logger.info("---------enter DefinedComparator flag---------"); 
                                                      
        CombinationKey c1 = (CombinationKey) combinationKeyOne; 
        CombinationKey c2 = (CombinationKey) CombinationKeyOther; 
                                                      
        /** 
        * 確保進行排序的數據在同一個區內,若是不在同一個區則按照組合鍵中第一個鍵排序 
        * 另外,這個判斷是能夠調整最終輸出的組合鍵第一個值的排序 
        * 下面這種比較對第一個字段的排序是升序的,若是想降序這將c1和c2���過來(假設1) 
        */
        if(!c1.getFirstKey().equals(c2.getFirstKey())){ 
            logger.info("---------out DefinedComparator flag---------"); 
            return c1.getFirstKey().compareTo(c2.getFirstKey()); 
            } 
        else{//按照組合鍵的第二個鍵的升序排序,將c1和c2倒過來則是按照數字的降序排序(假設2) 
            logger.info("---------out DefinedComparator flag---------"); 
            return c1.getSecondKey().get()-c2.getSecondKey().get();//0,負數,正數 
        } 
        /** 
        * (1)按照上面的這種實現最終的二次排序結果爲: 
        * sort1    1,2 
        * sort2    3,54,77 
        * sort6    20,22,221 
        * (2)若是實現假設1,則最終的二次排序結果爲: 
        * sort6    20,22,221 
        * sort2    3,54,77 
        * sort1    1,2 
        * (3)若是實現假設2,則最終的二次排序結果爲: 
        * sort1    2,1 
        * sort2    77,54,3 
        * sort6    221,22,20 
        */
        } 
}

說明:自定義比較器決定了咱們二次排序的結果。自定義比較器須要繼承WritableComparator類,而且重寫compare方法實現本身的比較策略。具體的排序問題請看註釋。
linux

相關文章
相關標籤/搜索