MapReduce的自制Writable分組輸出及組內排序

問題描述:
java

輸入文件格式以下:apache

name1    2app

name3    4ide

name1    6函數

name1    1oop

name3    3this

name1    0spa

要求輸出的文件格式以下:orm

name1    0,1,2,6對象

name3    3,4

要求是按照第一列分組,name1與name3也是按照順序排列的,組內升序排序

思路:

常規的輸出,沒法排序key所對應的多個值的順序。爲了排序組內中的值,須要將key與value放在同一個組。Job中有兩個方法setGroupingComparatorClass和setSortComparatorClass,能夠利用這兩個方法來實現組內排序。可是這些排序都是基於key的,則就要將key和value定義成組合鍵。

可是必需要保證第一列相同的所有都放在同一個分區中,則就須要自定義分區,分區的時候只考慮第一列的值。因爲partitioner僅僅能保證每個reducer接受同一個name的全部記錄,可是reducer仍然是經過鍵進行分組的分區,也就說該分區中仍是按照鍵來分紅不一樣的組,還須要分組只參考name值

先按照name分組,再在name中內部進行排序。

解決方法:

運用自定義組合鍵的策略,將name和1定義爲一個組合鍵。在分區的時候只參考name的值,即繼承partitioner。

 因爲要按照name分組,則就須要定義分組策略,而後設置setGroupingComparatorClass。

setGroupingComparatorClass主要定義哪些key能夠放置在一組,分組的時候會對組合鍵進行比較,因爲這裏只須要考慮組合鍵中的一個值,則定義實現一個WritableComparator,設置比較策略。

對於組內的排序,能夠利用setSortComparatorClass來實現,

這個方法主要用於定義key如何進行排序在它們傳遞給reducer以前,

這裏就能夠來進行組內排序。

具體代碼:

     Hadoop版本號:hadoop1.1.2

自定義組合鍵

package whut;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
//自定義組合鍵策略
//java基本類型數據
public class TextInt implements WritableComparable{
    //直接利用java的基本數據類型
    private String firstKey;
    private int secondKey;
    //必需要有一個默認的構造函數
    public String getFirstKey() {
        return firstKey;
    }
    public void setFirstKey(String firstKey) {
        this.firstKey = firstKey;
    }
    public int getSecondKey() {
        return secondKey;
    }
    public void setSecondKey(int secondKey) {
        this.secondKey = secondKey;
    }
                                                                                                                                                                         
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(firstKey);
        out.writeInt(secondKey);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        firstKey=in.readUTF();
        secondKey=in.readInt();
    }
    //map的鍵的比較就是根據這個方法來進行的
    @Override
    public int compareTo(Object o) {
        // TODO Auto-generated method stub
        TextInt ti=(TextInt)o;
        //利用這個來控制升序或降序
        //this本對象寫在前面表明是升序
        //this本對象寫在後面表明是降序
        return this.getFirstKey().compareTo(ti.getFirstKey());
    }
}

分組策略

package whut;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//主要就是對於分組進行排序,分組只按照組建鍵中的一個值進行分組
public class TextComparator extends WritableComparator {
    //必需要調用父類的構造器
    protected TextComparator() {
        super(TextInt.class,true);//註冊comparator
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextInt ti1=(TextInt)a;
        TextInt ti2=(TextInt)b;
        return ti1.getFirstKey().compareTo(ti2.getFirstKey());
    }
}

組內排序策略

package whut;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分組內部進行排序,按照第二個字段進行排序
public class TextIntComparator extends WritableComparator {
    public TextIntComparator()
    {
        super(TextInt.class,true);
    }
    //這裏能夠進行排序的方式管理
    //必須保證是同一個分組的
    //a與b進行比較
    //若是a在前b在後,則會產生升序
    //若是a在後b在前,則會產生降序
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextInt ti1=(TextInt)a;
        TextInt ti2=(TextInt)b;
        //首先要保證是同一個組內,同一個組的標識就是第一個字段相同
        if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
           return ti1.getFirstKey().compareTo(ti2.getFirstKey());
        else
           return ti2.getSecondKey()-ti1.getSecondKey();//0,-1,1
    }
                                                                                                                                                         
}

分區策略

package whut;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//參數爲map的輸出類型
public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
    @Override
    public int getPartition(TextInt key, IntWritable value, int numPartitions) {
        // TODO Auto-generated method stub
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    }
}

MapReduce策略

package whut;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//須要對數據進行分組以及組內排序的時候
public class SortMain extends Configured implements Tool{
    //這裏設置輸入文格式爲KeyValueTextInputFormat
    //name1 5
    //默認輸入格式都是Text,Text
    public static class GroupMapper extends
       Mapper<Text, Text, TextInt, IntWritable>  {
        public IntWritable second=new IntWritable();
        public TextInt tx=new TextInt();
        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            String lineKey=key.toString();
            String lineValue=value.toString();
            int lineInt=Integer.parseInt(lineValue);
            tx.setFirstKey(lineKey);
            tx.setSecondKey(lineInt);
            second.set(lineInt);
            context.write(tx, second);
        }
    }
    //設置reduce
    public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text>
    {
        @Override
        protected void reduce(TextInt key, Iterable<IntWritable> values,
               Context context)
                throws IOException, InterruptedException {
            StringBuffer sb=new StringBuffer();
            for(IntWritable val:values)
            {
                sb.append(val+",");
            }
            if(sb.length()>0)
            {
                sb.deleteCharAt(sb.length()-1);
            }
            context.write(new Text(key.getFirstKey()), new Text(sb.toString()));
        }
    }
                                                                                                                                       
    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf=getConf();
        Job job=new Job(conf,"SecondarySort");
        job.setJarByClass(SortMain.class);
        // 設置輸入文件的路徑,已經上傳在HDFS
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 設置輸出文件的路徑,輸出文件也存在HDFS中,可是輸出目錄不能已經存在
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
                                                                                                                                           
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReduce.class);
        //設置分區方法
        job.setPartitionerClass(KeyPartitioner.class);
                                                                                                                                           
        //下面這兩個都是針對map端的
        //設置分組的策略,哪些key能夠放置到一組中
        job.setGroupingComparatorClass(TextComparator.class);
        //設置key如何進行排序在傳遞給reducer以前.
        //這裏就能夠設置對組內如何排序的方法
        /*************關鍵點**********/
        job.setSortComparatorClass(TextIntComparator.class);
        //設置輸入文件格式
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        //使用默認的輸出格式即TextInputFormat
        //設置map的輸出key和value類型
        job.setMapOutputKeyClass(TextInt.class);
        job.setMapOutputValueClass(IntWritable.class);
        //設置reduce的輸出key和value類型
        //job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.waitForCompletion(true);
        int exitCode=job.isSuccessful()?0:1;
        return exitCode;
    }
                                                                                                                                       
    public static void main(String[] args)  throws Exception
    {
       int exitCode=ToolRunner.run(new SortMain(), args);
       System.exit(exitCode);
    }
}

注意事項

   1,設置分組排序按照升序仍是降序是在自定義WritableComparable中的compareTo()方法實現的,具體升序或者降序的設置在代碼中已經註釋說明

   2,設置組內值進行升序仍是降序的排序是在組內排序策略中的compare()方法註釋說明的。

   3,這裏同時最重要的一點是,將第二列即放在組合鍵中,又做爲value,這樣對於組合鍵排序也就至關於對於value進行排序了。

   4,在自定義組合鍵的時候,對於組合鍵中的數據的基本類型能夠採用Java的基本類型也能夠採用Hadoop的基本數據類型,對於Hadoop的基本數據類型必定要記得初始化new一個基本數據類型對象。對於組合鍵類,必需要有默認的構造方法。

相關文章
相關標籤/搜索