關於二次排序主要涉及到這麼幾個東西: html
在0.20.0 之前使用的是 java
setPartitionerClass git
setOutputkeyComparatorClass github
setOutputValueGroupingComparator sql
在0.20.0之後使用是 shell
job.setPartitionerClass(Partitioner p); apache
job.setSortComparatorClass(RawComparator c); 編程
job.setGroupingComparatorClass(RawComparator c); cookie
下面的例子裏面只用到了 setGroupingComparatorClass session
mr自帶的例子中的源碼SecondarySort,我從新寫了一下,基本沒變。
這個例子中定義的map和reduce以下,關鍵是它對輸入輸出類型的定義:(java泛型編程)
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable>
就是首先按照第一字段排序,而後再對第一字段相同的行按照第二字段排序,注意不能破壞第一次排序 的結果 。例如 :
echo "3 b
1 c
2 a
1 d
3 a"|sort -k1 -k2
1 c
1 d
2 a
3 a
3 b
2.1 分區函數類。這是key的第一次比較。
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>這個例子中沒有使用key比較函數類,而是使用key的實現的compareTo方法:
package SecondarySort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class SecondarySort { //本身定義的key類應該實現WritableComparable接口 public static class IntPair implements WritableComparable<IntPair> { String first; String second; /** * Set the left and right values. */ public void set(String left, String right) { first = left; second = right; } public String getFirst() { return first; } public String getSecond() { return second; } //反序列化,從流中的二進制轉換成IntPair public void readFields(DataInput in) throws IOException { first = in.readUTF(); second = in.readUTF(); } //序列化,將IntPair轉化成使用流傳送的二進制 public void write(DataOutput out) throws IOException { out.writeUTF(first); out.writeUTF(second); } //重載 compareTo 方法,進行組合鍵 key 的比較,該過程是默認行爲。 //分組後的二次排序會隱式調用該方法。 public int compareTo(IntPair o) { if (!first.equals(o.first) ) { return first.compareTo(o.first); } else if (!second.equals(o.second)) { return second.compareTo(o.second); } else { return 0; } } //新定義類應該重寫的兩個方法 //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) public int hashCode() { return first.hashCode() * 157 + second.hashCode(); } public boolean equals(Object right) { if (right == null) return false; if (this == right) return true; if (right instanceof IntPair) { IntPair r = (IntPair) right; return r.first.equals(first) && r.second.equals(second) ; } else { return false; } } } /** * 分區函數類。根據first肯定Partition。 */ public static class FirstPartitioner extends Partitioner<IntPair, Text> { public int getPartition(IntPair key, Text value,int numPartitions) { return Math.abs(key.getFirst().hashCode() * 127) % numPartitions; } } /** * 分組函數類。只要first相同就屬於同一個組。 */ /*//第一種方法,實現接口RawComparator public static class GroupingComparator implements RawComparator<IntPair> { public int compare(IntPair o1, IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } //一個字節一個字節的比,直到找到一個不相同的字節,而後比這個字節的大小做爲兩個字節流的大小比較結果。 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } }*/ //第二種方法,繼承WritableComparator public static class GroupingComparator extends WritableComparator { protected GroupingComparator() { super(IntPair.class, true); } //Compare two WritableComparables. // 重載 compare:對組合鍵按第一個天然鍵排序分組 public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; String l = ip1.getFirst(); String r = ip2.getFirst(); return l.compareTo(r); } } // 自定義map public static class Map extends Mapper<LongWritable, Text, IntPair, Text> { private final IntPair keyPair = new IntPair(); String[] lineArr = null; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); lineArr = line.split("\t", -1); keyPair.set(lineArr[0], lineArr[1]); context.write(keyPair, value); } } // 自定義reduce // public static class Reduce extends Reducer<IntPair, Text, Text, Text> { private static final Text SEPARATOR = new Text("------------------------------------------------"); public void reduce(IntPair key, Iterable<Text> values,Context context) throws IOException, InterruptedException { context.write(SEPARATOR, null); for (Text val : values) { context.write(null, val); } } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 讀取hadoop配置 Configuration conf = new Configuration(); // 實例化一道做業 Job job = new Job(conf, "secondarysort"); job.setJarByClass(SecondarySort.class); // Mapper類型 job.setMapperClass(Map.class); // 再也不須要Combiner類型,由於Combiner的輸出類型<Text, IntWritable>對Reduce的輸入類型<IntPair, IntWritable>不適用 //job.setCombinerClass(Reduce.class); // Reducer類型 job.setReducerClass(Reduce.class); // 分區函數 job.setPartitionerClass(FirstPartitioner.class); // 分組函數 job.setGroupingComparatorClass(GroupingComparator.class); // map 輸出Key的類型 job.setMapOutputKeyClass(IntPair.class); // map輸出Value的類型 job.setMapOutputValueClass(Text.class); // rduce輸出Key的類型,是Text,由於使用的OutputFormatClass是TextOutputFormat job.setOutputKeyClass(Text.class); // rduce輸出Value的類型 job.setOutputValueClass(Text.class); // 將輸入的數據集分割成小數據塊splites,同時提供一個RecordReder的實現。 job.setInputFormatClass(TextInputFormat.class); // 提供一個RecordWriter的實現,負責數據輸出。 job.setOutputFormatClass(TextOutputFormat.class); // 輸入hdfs路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); // 輸出hdfs路徑 FileSystem.get(conf).delete(new Path(args[1]), true); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交job System.exit(job.waitForCompletion(true) ? 0 : 1); } }
假如咱們如今的需求是先按 cookieId 排序,而後按 time 排序,以便按 session 切分日誌
cookieId time url 2 12:12:34 2_hao123 3 09:10:34 3_baidu 1 15:02:41 1_google 3 22:11:34 3_sougou 1 19:10:34 1_baidu 2 15:02:41 2_google 1 12:12:34 1_hao123 3 23:10:34 3_soso 2 05:02:41 2_google 結果: ------------------------------------------------ 1 12:12:34 1_hao123 1 15:02:41 1_google 1 19:10:34 1_baidu ------------------------------------------------ 2 05:02:41 2_google 2 12:12:34 2_hao123 2 15:02:41 2_google ------------------------------------------------ 3 09:10:34 3_baidu 3 22:11:34 3_sougou 3 23:10:34 3_soso
hive中使用標準sql實現分組內排序
http://superlxw1234.iteye.com/blog/1869612
Pig、Hive、MapReduce 解決分組 Top K 問題
http://my.oschina.net/leejun2005/blog/85187
mapreduce的二次排序 SecondarySort
http://blog.csdn.net/zyj8170/article/details/7530728
學會定製MapReduce裏的partition,sort和grouping,Secondary Sort Made Easy 進行二次排序
http://blog.sina.com.cn/s/blog_9bf980ad0100zk7r.html
Simple Moving Average, Secondary Sort, and MapReduce (Part 3)
http://blog.cloudera.com/blog/2011/04/simple-moving-average-secondary-sort-and-mapreduce-part-3/
https://github.com/jpatanooga/Caduceus/tree/master/src/tv/floe/caduceus/hadoop/movingaverage
MapReduce的排序和二次排序原理總結
http://hugh-wangp.iteye.com/blog/1491175
泛型value的二次排序