1、背景java
按照年份升序排序,同時每年中溫度降序排序apache
data文件爲1949年-1955年天天的溫度數據。app
要求:一、計算1949-1955年,每一年溫度最高的時間ide
二、計算1949-1955年,每一年溫度最高的十天函數
1949-10-01 14:21:02 34℃ 1949-10-02 14:01:02 36℃ 1950-01-01 14:21:02 32℃ 1950-10-01 11:01:02 37℃ 1951-10-01 14:21:02 23℃ 1950-10-02 17:11:02 41℃ 1950-10-01 18:20:02 27℃ 1951-07-01 14:01:02 45℃ 1951-07-02 13:21:02 46℃
2、二次排序原理oop
默認狀況下,Map 輸出的結果會對 Key 進行默認的排序,可是有時候須要對 Key 排序的同時再對 Value 進行排序,這時候就要用到二次排序了。下面讓咱們來介紹一下什麼是二次排序。this
2.1 Map起始階段spa
在Map階段,使用job.setInputFormatClass()定義的InputFormat,將輸入的數據集分割成小數據塊split,同時InputFormat提供一個RecordReader的實現。在這裏咱們使用的是TextInputFormat,它提供的RecordReader會將文本的行號做爲Key,這一行的文本做爲Value。這就是自定 Mapper的輸入是<LongWritable,Text> 的緣由。而後調用自定義Mapper的map方法,將一個個<LongWritable,Text>鍵值對輸入給Mapper的map方法code
2.2 Map最後階段orm
在Map階段的最後,會先調用job.setPartitionerClass()對這個Mapper的輸出結果進行分區,每一個分區映射到一個Reducer。每一個分區內又調用job.setSortComparatorClass()設置的Key比較函數類排序。能夠看到,這自己就是一個二次排序。若是沒有經過job.setSortComparatorClass()設置 Key比較函數類,則使用Key實現的compareTo()方法
2.3 Reduce階段
在Reduce階段,reduce()方法接受全部映射到這個Reduce的map輸出後,也會調用job.setSortComparatorClass()方法設置的Key比較函數類,對全部數據進行排序。而後開始構造一個Key對應的Value迭代器。這時就要用到分組,使用 job.setGroupingComparatorClass()方法設置分組函數類。只要這個比較器比較的兩個Key相同,它們就屬於同一組,它們的 Value放在一個Value迭代器,而這個迭代器的Key使用屬於同一個組的全部Key的第一個Key。最後就是進入Reducer的 reduce()方法,reduce()方法的輸入是全部的Key和它的Value迭代器,一樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。
3、二次排序流程
在本例中要比較兩次。先按照第年份排序,而後再對年份相同的按照溫度排序。根據這一點,咱們能夠構造一個複合類KeyPair ,它有兩個字段,先利用分區對第一字段排序,再利用分區內的比較對第二字段排序。二次排序的流程分爲如下幾步。
3.1 自定義key
全部自定義的key應該實現接口WritableComparable,由於它是可序列化的而且可比較的。WritableComparable 的內部方法以下所示
// 反序列化,從流中的二進制轉換成IntPair public void readFields(DataInput in) throws IOException // 序列化,將IntPair轉化成使用流傳送的二進制 public void write(DataOutput out) // key的比較 public int compareTo(IntPair o) // 默認的分區類 HashPartitioner,使用此方法 public int hashCode() // 默認實現 public boolean equals(Object right)
3.2 自定義分區
自定義分區函數類FirstPartitioner,是key的第一次比較,完成對全部key的排序。
public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>
在job中使用setPartitionerClasss()方法設置Partitioner
job.setPartitionerClasss(FirstPartitioner.Class);
3.3 自定義排序類
這是Key的第二次比較,對全部的Key進行排序,即同時完成IntPair中的first和second排序。該類是一個比較器,能夠經過兩種方式實現。
1) 繼承WritableComparator。
public static class KeyComparator extends WritableComparator
必須有一個構造函數,而且重載如下方法。
public int compare(WritableComparable w1, WritableComparable w2)
2) 實現接口 RawComparator。
上面兩種實現方式,在Job中,能夠經過setSortComparatorClass()方法來設置Key的比較類。
job.setSortComparatorClass(KeyComparator.Class);
3.4 自定義分組類
在Reduce階段,構造一個與 Key 相對應的 Value 迭代器的時候,只要first相同就屬於同一個組,放在一個Value迭代器。定義這個比較器,能夠有兩種方式。
分組的實質也是排序,此例子中排序是按照年份和溫度,而分組只是按照年份。
1) 繼承WritableComparator。
public static class KeyComparator extends WritableComparator
必須有一個構造函數,而且重載如下方法。
public int compare(WritableComparable w1, WritableComparable w2)
2) 實現接口 RawComparator。
上面兩種實現方式,在Job中,能夠經過setSortComparatorClass()方法來設置Key的比較類。
job.setGroupingComparatorClass(GroupingComparator.Class);
另外注意的是,若是reduce的輸入與輸出不是同一種類型,則 Combiner和Reducer 不能共用 Reducer 類,由於 Combiner 的輸出是 reduce 的輸入。除非從新定義一個Combiner。
4、代碼實現
思路:
一、按照年份升序排序,同時每年中溫度降序排序
二、按照年份分組,每年對應一個reduce任務
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class KeyPair implements WritableComparable<KeyPair> { private int year; //年份 private int hot; //溫度 public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getHot() { return hot; } public void setHot(int hot) { this.hot = hot; } @Override public void readFields(DataInput in) throws IOException { this.year = in.readInt(); this.hot = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(hot); } //重寫compareTo方法,用做key的比較,先比較年份,年份相同再比較溫度 @Override public int compareTo(KeyPair o) { int y = Integer.compare(year, o.getYear()); if(y == 0){ return Integer.compare(hot, o.getHot()); } return y; } @Override public String toString() { return year+"\t"+hot; } }
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class FirstPartitioner extends Partitioner<KeyPair, Text> { @Override public int getPartition(KeyPair key, Text value, int nums) { //按照年份分區,乘127是爲了分散開,nums是reduce數量 return (key.getYear()*127 & Integer.MAX_VALUE) % nums; } }
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class SortKey extends WritableComparator { public SortKey() { super(KeyPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { KeyPair k1 = (KeyPair)a; KeyPair k2 = (KeyPair)b; //先比較年份 int pre = Integer.compare(k1.getYear(), k2.getYear()); if(pre != 0){ return pre; } //年份相同比較溫度 //溫度倒序 return -Integer.compare(k1.getHot(), k2.getHot()); } }
分組的實質也是排序,此例子中排序是按照年份和溫度,而分組只是按照年份。
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupComparator extends WritableComparator { protected GroupComparator() { super(KeyPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { KeyPair k1 = (KeyPair)a; KeyPair k2 = (KeyPair)b; //按照年份分組,每年一個reduce,不考慮溫度 return Integer.compare(k1.getYear(), k2.getYear()); } }
import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, KeyPair, Text> { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private KeyPair k = new KeyPair(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //keypair做爲key,每一行文本做爲value String line = new String(value.getBytes(), 0, value.getLength(), "GBK"); String[] tmp = line.split("\t"); System.out.println(tmp[0]+"\t"+tmp[1]); if(tmp.length>=2){ try { Date date = sdf.parse(tmp[0]); Calendar cal = Calendar.getInstance(); cal.setTime(date); int year = cal.get(1); k.setYear(year); } catch (ParseException e) { e.printStackTrace(); } int hot = Integer.parseInt(tmp[1].substring(0, tmp[1].indexOf("℃"))); k.setHot(hot); context.write(k, value); } } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<KeyPair, Text, KeyPair,Text> { @Override protected void reduce(KeyPair key, Iterable<Text> value,Context context) throws IOException, InterruptedException { for(Text t : value){ context.write(key, t); } } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class YearHot { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "year hot sort"); job.setJarByClass(YearHot.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(3); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(SortKey.class); job.setGroupingComparatorClass(GroupComparator.class); job.setOutputKeyClass(KeyPair.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(GBKOutputFormat.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.228.134:/usr/input/data.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.228.134:/usr/output")); System.exit(job.waitForCompletion(true)?0:1); } }