問題描述:
java
輸入文件格式以下:apache
name1 2app
name3 4ide
name1 6函數
name1 1oop
name3 3this
name1 0spa
要求輸出的文件格式以下:orm
name1 0,1,2,6對象
name3 3,4
要求是按照第一列分組,name1與name3也是按照順序排列的,組內升序排序。
思路:
常規的輸出,沒法排序key所對應的多個值的順序。爲了排序組內中的值,須要將key與value放在同一個組。Job中有兩個方法setGroupingComparatorClass和setSortComparatorClass,能夠利用這兩個方法來實現組內排序。可是這些排序都是基於key的,則就要將key和value定義成組合鍵。
可是必需要保證第一列相同的所有都放在同一個分區中,則就須要自定義分區,分區的時候只考慮第一列的值。因爲partitioner僅僅能保證每個reducer接受同一個name的全部記錄,可是reducer仍然是經過鍵進行分組的分區,也就說該分區中仍是按照鍵來分紅不一樣的組,還須要分組只參考name值
先按照name分組,再在name中內部進行排序。
解決方法:
運用自定義組合鍵的策略,將name和1定義爲一個組合鍵。在分區的時候只參考name的值,即繼承partitioner。
因爲要按照name分組,則就須要定義分組策略,而後設置setGroupingComparatorClass。
setGroupingComparatorClass主要定義哪些key能夠放置在一組,分組的時候會對組合鍵進行比較,因爲這裏只須要考慮組合鍵中的一個值,則定義實現一個WritableComparator,設置比較策略。
對於組內的排序,能夠利用setSortComparatorClass來實現,
這個方法主要用於定義key如何進行排序在它們傳遞給reducer以前,
這裏就能夠來進行組內排序。
具體代碼:
Hadoop版本號:hadoop1.1.2
自定義組合鍵
package whut; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; //自定義組合鍵策略 //java基本類型數據 public class TextInt implements WritableComparable{ //直接利用java的基本數據類型 private String firstKey; private int secondKey; //必需要有一個默認的構造函數 public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub firstKey=in.readUTF(); secondKey=in.readInt(); } //map的鍵的比較就是根據這個方法來進行的 @Override public int compareTo(Object o) { // TODO Auto-generated method stub TextInt ti=(TextInt)o; //利用這個來控制升序或降序 //this本對象寫在前面表明是升序 //this本對象寫在後面表明是降序 return this.getFirstKey().compareTo(ti.getFirstKey()); } }
分組策略
package whut; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //主要就是對於分組進行排序,分組只按照組建鍵中的一個值進行分組 public class TextComparator extends WritableComparator { //必需要調用父類的構造器 protected TextComparator() { super(TextInt.class,true);//註冊comparator } @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextInt ti1=(TextInt)a; TextInt ti2=(TextInt)b; return ti1.getFirstKey().compareTo(ti2.getFirstKey()); } }
組內排序策略
package whut; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //分組內部進行排序,按照第二個字段進行排序 public class TextIntComparator extends WritableComparator { public TextIntComparator() { super(TextInt.class,true); } //這裏能夠進行排序的方式管理 //必須保證是同一個分組的 //a與b進行比較 //若是a在前b在後,則會產生升序 //若是a在後b在前,則會產生降序 @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextInt ti1=(TextInt)a; TextInt ti2=(TextInt)b; //首先要保證是同一個組內,同一個組的標識就是第一個字段相同 if(!ti1.getFirstKey().equals(ti2.getFirstKey())) return ti1.getFirstKey().compareTo(ti2.getFirstKey()); else return ti2.getSecondKey()-ti1.getSecondKey();//0,-1,1 } }
分區策略
package whut; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; //參數爲map的輸出類型 public class KeyPartitioner extends Partitioner<TextInt, IntWritable> { @Override public int getPartition(TextInt key, IntWritable value, int numPartitions) { // TODO Auto-generated method stub return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; } }
MapReduce策略
package whut; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //須要對數據進行分組以及組內排序的時候 public class SortMain extends Configured implements Tool{ //這裏設置輸入文格式爲KeyValueTextInputFormat //name1 5 //默認輸入格式都是Text,Text public static class GroupMapper extends Mapper<Text, Text, TextInt, IntWritable> { public IntWritable second=new IntWritable(); public TextInt tx=new TextInt(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String lineKey=key.toString(); String lineValue=value.toString(); int lineInt=Integer.parseInt(lineValue); tx.setFirstKey(lineKey); tx.setSecondKey(lineInt); second.set(lineInt); context.write(tx, second); } } //設置reduce public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text> { @Override protected void reduce(TextInt key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { StringBuffer sb=new StringBuffer(); for(IntWritable val:values) { sb.append(val+","); } if(sb.length()>0) { sb.deleteCharAt(sb.length()-1); } context.write(new Text(key.getFirstKey()), new Text(sb.toString())); } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf=getConf(); Job job=new Job(conf,"SecondarySort"); job.setJarByClass(SortMain.class); // 設置輸入文件的路徑,已經上傳在HDFS FileInputFormat.addInputPath(job, new Path(args[0])); // 設置輸出文件的路徑,輸出文件也存在HDFS中,可是輸出目錄不能已經存在 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(GroupMapper.class); job.setReducerClass(GroupReduce.class); //設置分區方法 job.setPartitionerClass(KeyPartitioner.class); //下面這兩個都是針對map端的 //設置分組的策略,哪些key能夠放置到一組中 job.setGroupingComparatorClass(TextComparator.class); //設置key如何進行排序在傳遞給reducer以前. //這裏就能夠設置對組內如何排序的方法 /*************關鍵點**********/ job.setSortComparatorClass(TextIntComparator.class); //設置輸入文件格式 job.setInputFormatClass(KeyValueTextInputFormat.class); //使用默認的輸出格式即TextInputFormat //設置map的輸出key和value類型 job.setMapOutputKeyClass(TextInt.class); job.setMapOutputValueClass(IntWritable.class); //設置reduce的輸出key和value類型 //job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); int exitCode=job.isSuccessful()?0:1; return exitCode; } public static void main(String[] args) throws Exception { int exitCode=ToolRunner.run(new SortMain(), args); System.exit(exitCode); } }
注意事項
1,設置分組排序按照升序仍是降序是在自定義WritableComparable中的compareTo()方法實現的,具體升序或者降序的設置在代碼中已經註釋說明
2,設置組內值進行升序仍是降序的排序是在組內排序策略中的compare()方法註釋說明的。
3,這裏同時最重要的一點是,將第二列即放在組合鍵中,又做爲value,這樣對於組合鍵排序也就至關於對於value進行排序了。
4,在自定義組合鍵的時候,對於組合鍵中的數據的基本類型能夠採用Java的基本類型也能夠採用Hadoop的基本數據類型,對於Hadoop的基本數據類型必定要記得初始化new一個基本數據類型對象。對於組合鍵類,必需要有默認的構造方法。