Hadoop MapReduce 二次排序原理及其應用

關於二次排序主要涉及到這麼幾個東西: html

0.20.0 之前使用的是 java

setPartitionerClass  git

setOutputkeyComparatorClass github

setOutputValueGroupingComparator  sql

 在0.20.0之後使用是 shell

job.setPartitionerClass(Partitioner p); apache

job.setSortComparatorClass(RawComparator c); 編程

job.setGroupingComparatorClass(RawComparator c); cookie

下面的例子裏面只用到了 setGroupingComparatorClass session

http://blog.csdn.net/heyutao007/article/details/5890103 

mr自帶的例子中的源碼SecondarySort,我從新寫了一下,基本沒變。 
這個例子中定義的map和reduce以下,關鍵是它對輸入輸出類型的定義:(java泛型編程) 
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> 
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable> 

一、首先說一下工做原理: 

在map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文本的字節偏移量做爲key,這一行的文本做爲value。這就是自定義Map的輸入是<LongWritable, Text>的緣由。而後調用自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。在map階段的最後,會先調用job.setPartitionerClass對這個List進行分區,每一個分區映射到一個reducer。每一個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。能夠看到,這自己就是一個二次排序。 若是沒有經過job.setSortComparatorClass設置key比較函數類,則使用key的實現的compareTo方法。 在第一個例子中,使用了IntPair實現的compareTo方法,而在下一個例子中,專門定義了key比較函數類。 

在reduce階段,reducer接收到全部映射到這個reducer的map輸出後,也是會調用job.setSortComparatorClass設置的key比較函數類對全部數據對排序。而後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的全部key的第一個key。最後就是進入Reducer的reduce方法,reduce方法的輸入是全部的(key和它的value迭代器)。一樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。 

二、二次排序 

就是首先按照第一字段排序,而後再對第一字段相同的行按照第二字段排序,注意不能破壞第一次排序 的結果 。例如 :

echo "3 b
1 c
2 a
1 d
3 a"|sort -k1 -k2
1 c
1 d
2 a
3 a
3 b

三、具體步驟: 

1 自定義key。 

在mr中,全部的key是須要被比較和排序的,而且是二次,先根據partitione,再根據大小。而本例中也是要比較兩次。先按照第一字段排序,而後再對第一字段相同的按照第二字段排序。根據這一點,咱們能夠構造一個複合類IntPair,他有兩個字段,先利用分區對第一字段排序,再利用分區內的比較對第二字段排序。 
全部自定義的key應該實現接口WritableComparable,由於是可序列的而且可比較的。並重載方法 
//反序列化,從流中的二進制轉換成IntPair 
public void readFields(DataInput in) throws IOException 
        
//序列化,將IntPair轉化成使用流傳送的二進制 
public void write(DataOutput out) 

//key的比較 
public int compareTo(IntPair o) 
        
另外新定義的類應該重寫的兩個方法 
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) 
public int hashCode() 
public boolean equals(Object right) 

2 因爲key是自定義的,因此還須要自定義一下類: 

2.1 分區函數類。這是key的第一次比較。 

public static class FirstPartitioner extends Partitioner<IntPair,IntWritable> 

在job中設置使用setPartitionerClasss 

2.2 key比較函數類。這是key的第二次比較。這是一個比較器,須要繼承WritableComparator。 
public static class KeyComparator extends WritableComparator 
必須有一個構造函數,而且重載 public int compare(WritableComparable w1, WritableComparable w2) 
另外一種方法是 實現接口RawComparator。 
在job中設置使用setSortComparatorClass。 

2.3 分組函數類。在reduce階段,構造一個key對應的value迭代器的時候,只要first相同就屬於同一個組,放在一個value迭代器。這是一個比較器,須要繼承WritableComparator。 
public static class GroupingComparator extends WritableComparator 
同key比較函數類,必須有一個構造函數,而且重載 public int compare(WritableComparable w1, WritableComparable w2) 
同key比較函數類,分組函數類另外一種方法是實現接口RawComparator。 
在job中設置使用setGroupingComparatorClass。 

另外注意的是,若是reduce的輸入與輸出不是同一種類型,則不要定義Combiner也使用reduce,由於Combiner的輸出是reduce的輸入。除非從新定義一個Combiner。 

4 代碼:

這個例子中沒有使用key比較函數類,而是使用key的實現的compareTo方法

package SecondarySort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SecondarySort
{
    //本身定義的key類應該實現WritableComparable接口
    public static class IntPair implements WritableComparable<IntPair>
    {
        String first;
        String second;
        /**
         * Set the left and right values.
         */
        public void set(String left, String right)
        {
            first = left;
            second = right;
        }
        public String getFirst()
        {
            return first;
        }
        public String getSecond()
        {
            return second;
        }
        //反序列化,從流中的二進制轉換成IntPair
        public void readFields(DataInput in) throws IOException
        {
            first = in.readUTF();
            second = in.readUTF();
        }
        //序列化,將IntPair轉化成使用流傳送的二進制
        public void write(DataOutput out) throws IOException
        {
            out.writeUTF(first);
            out.writeUTF(second);
        }
        //重載 compareTo 方法,進行組合鍵 key 的比較,該過程是默認行爲。
        //分組後的二次排序會隱式調用該方法。
        public int compareTo(IntPair o)
        {
            if (!first.equals(o.first) )
            {
                return first.compareTo(o.first);
            }
            else if (!second.equals(o.second))
            {
                return second.compareTo(o.second);
            }
            else
            {
                return 0;
            }
        }

        //新定義類應該重寫的兩個方法
        //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
        public int hashCode()
        {
            return first.hashCode() * 157 + second.hashCode();
        }
        public boolean equals(Object right)
        {
            if (right == null)
                return false;
            if (this == right)
                return true;
            if (right instanceof IntPair)
            {
                IntPair r = (IntPair) right;
                return r.first.equals(first) && r.second.equals(second) ;
            }
            else
            {
                return false;
            }
        }
    }
    /**
      * 分區函數類。根據first肯定Partition。
      */
    public static class FirstPartitioner extends Partitioner<IntPair, Text>
    {
        public int getPartition(IntPair key, Text value,int numPartitions)
        {
            return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
        }
    }

    /**
     * 分組函數類。只要first相同就屬於同一個組。
     */
    /*//第一種方法,實現接口RawComparator
    public static class GroupingComparator implements RawComparator<IntPair> {
        public int compare(IntPair o1, IntPair o2) {
            int l = o1.getFirst();
            int r = o2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
        //一個字節一個字節的比,直到找到一個不相同的字節,而後比這個字節的大小做爲兩個字節流的大小比較結果。
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
             return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
                     b2, s2, Integer.SIZE/8);
        }
    }*/
    //第二種方法,繼承WritableComparator
    public static class GroupingComparator extends WritableComparator
    {
        protected GroupingComparator()
        {
            super(IntPair.class, true);
        }
        //Compare two WritableComparables.
        //  重載 compare:對組合鍵按第一個天然鍵排序分組
        public int compare(WritableComparable w1, WritableComparable w2)
        {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            String l = ip1.getFirst();
            String r = ip2.getFirst();
            return l.compareTo(r);
        }
    }


    // 自定義map
    public static class Map extends Mapper<LongWritable, Text, IntPair, Text>
    {
        private final IntPair keyPair = new IntPair();
        String[] lineArr = null;
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String line = value.toString();
            lineArr = line.split("\t", -1);
            keyPair.set(lineArr[0], lineArr[1]);
            context.write(keyPair, value);
        }
    }
    // 自定義reduce
    //
    public static class Reduce extends Reducer<IntPair, Text, Text, Text>
    {
        private static final Text SEPARATOR = new Text("------------------------------------------------");
        
        public void reduce(IntPair key, Iterable<Text> values,Context context) throws IOException, InterruptedException
        {
            context.write(SEPARATOR, null);
            for (Text val : values)
            {
                context.write(null, val);
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
    {
        // 讀取hadoop配置
        Configuration conf = new Configuration();
        // 實例化一道做業
        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        // Mapper類型
        job.setMapperClass(Map.class);
        // 再也不須要Combiner類型,由於Combiner的輸出類型<Text, IntWritable>對Reduce的輸入類型<IntPair, IntWritable>不適用
        //job.setCombinerClass(Reduce.class);
        // Reducer類型
        job.setReducerClass(Reduce.class);
        // 分區函數
        job.setPartitionerClass(FirstPartitioner.class);
        // 分組函數
        job.setGroupingComparatorClass(GroupingComparator.class);

        // map 輸出Key的類型
        job.setMapOutputKeyClass(IntPair.class);
        // map輸出Value的類型
        job.setMapOutputValueClass(Text.class);
        // rduce輸出Key的類型,是Text,由於使用的OutputFormatClass是TextOutputFormat
        job.setOutputKeyClass(Text.class);
        // rduce輸出Value的類型
        job.setOutputValueClass(Text.class);

        // 將輸入的數據集分割成小數據塊splites,同時提供一個RecordReder的實現。
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一個RecordWriter的實現,負責數據輸出。
        job.setOutputFormatClass(TextOutputFormat.class);

        // 輸入hdfs路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 輸出hdfs路徑
        FileSystem.get(conf).delete(new Path(args[1]), true);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 提交job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

5 測試需求

假如咱們如今的需求是先按 cookieId 排序,而後按 time 排序,以便按 session 切分日誌

6 測試數據與結果

cookieId	time	url
2	12:12:34	2_hao123
3	09:10:34	3_baidu
1	15:02:41	1_google
3	22:11:34	3_sougou
1	19:10:34	1_baidu
2	15:02:41	2_google
1	12:12:34	1_hao123
3	23:10:34	3_soso
2	05:02:41	2_google

結果:
------------------------------------------------
1       12:12:34        1_hao123
1       15:02:41        1_google
1       19:10:34        1_baidu
------------------------------------------------
2       05:02:41        2_google
2       12:12:34        2_hao123
2       15:02:41        2_google
------------------------------------------------
3       09:10:34        3_baidu
3       22:11:34        3_sougou
3       23:10:34        3_soso

7 原理圖(點擊查看大圖)

八、推薦閱讀:

hive中使用標準sql實現分組內排序

http://superlxw1234.iteye.com/blog/1869612

Pig、Hive、MapReduce 解決分組 Top K 問題

http://my.oschina.net/leejun2005/blog/85187

九、REF:

mapreduce的二次排序 SecondarySort

http://blog.csdn.net/zyj8170/article/details/7530728

學會定製MapReduce裏的partition,sort和grouping,Secondary Sort Made Easy 進行二次排序

http://blog.sina.com.cn/s/blog_9bf980ad0100zk7r.html

Simple Moving Average, Secondary Sort, and MapReduce (Part 3)

http://blog.cloudera.com/blog/2011/04/simple-moving-average-secondary-sort-and-mapreduce-part-3/

https://github.com/jpatanooga/Caduceus/tree/master/src/tv/floe/caduceus/hadoop/movingaverage

MapReduce的排序和二次排序原理總結

http://hugh-wangp.iteye.com/blog/1491175

泛型value的二次排序 

http://wenku.baidu.com/view/a3826a235901020207409c47.html

http://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/

相關文章
相關標籤/搜索