本算法教程系列創建在您已經有了spark以及Hadoop的開發基礎,若是沒有的話,請觀看本博客的hadoop相關教程或者自行學習,代碼會在博客文檔寫到必定程度統一放到github下。java
二次排序是指在reducer階段對與某個鍵關聯的值進行排序,也叫做值轉換。git
MapReduce框架會自動對映射器生成的鍵進行排序。這說明,在reducer階段準備執行以前,他得到的數據必然是按鍵有序的(而不是值)。傳入各個reducer的值並非有序的,他們的順序咱們沒法肯定(取決於資源調配過程的處理順序)。咱們的實際需求中,對值的排序狀況是很常見的。爲了解決這種問題,咱們須要用到二次排序的設計模式。github
二次排序問題是指在reducer階段對與某個鍵關聯的值進行排序,也叫做值轉換。爲了理解方便,咱們定義通常的MapReduce處理過程的公式以下: $$ map(key_{1},value_{1}) => list(key_{2}, value_{2}) $$ $$ reduce(key_{2},list(value_{2})) => list(key_{3},value_{3}) $$算法
咱們對這兩個公式進行說明:首先,map函數接受一個k1-v1,而後他會輸出任何數量的k2-v2對。接下來,reduce函數接收另外一個k-list(v)做爲輸入,而後將其進行處理,輸出另外一個k-v。apache
顯然reduce函數的輸入list(value_{2})中的{v1,v2,vn....}是一個無序的,二次排序的目的就是讓他們變得有序!所以,咱們能夠根據上面的公式的模式來定義二次排序的公式,以下所示: $$ map(key_{1},value_{1}) => list(key_{2}, value_{2}) $$ $$ sort(V_{1},V_{2}....V_{n}) => ({S_{1},S{2}}...S_{n}) $$ V表明無序變量,S表明有序變量。設計模式
和以前同樣,學以至用。既然理解了二次排序的概念,咱們就來經過一些可以觸類旁通的用例來掌握二次排序的設計模式。app
假定咱們有以下的溫控數據。框架
2018,01,01,10 2018,01,02,5 2018,01,03,3 2018,01,04,12 2016,11,05,20 2016,11,15,30 2016,03,25,11 2016,04,22,19 2015,06,11,22 2015,06,10,33 2015,07,08,21 2015,02,06,5 2017,11,05,5 2017,11,04,0 2017,02,02,3 2017,02,03,9 2014,06,11,22 2014,06,10,33 2014,07,08,21 2014,07,06,5
該數據是按逗號分隔的每一行,分別表明年、月、日以及當天的溫度。用公式表現爲 $$ L_{0}=Year,L_{1}=Month,L_{2}=Day,L_{3}=Temperature,S=',' $$ 其中L表明列的意思,下標表明列因此(從0計數),S表明分隔符號,這裏的分隔符是英文逗號,整個文件以UTF-8編碼(若數據不含中文能夠沒必要在乎)。dom
如今咱們要求處理這段數據,按年月排序倒敘輸出的同時(忽略天氣),統計各個月以內溫度的變化趨勢,一樣按溫度的降序排列,輸出在同一行,下面是輸出範例:ide
2018-01 12,10,5,3 2017-11 5,0 2017-02 9,3 2016-11 30,20 ... ...
需求公式以下: $$ V(List(dateAndTemperature)) => S(List(date, S(Listtemparature))) $$ 其中date刪除了日期。
第一步,肯定Map階段生成的key,顯然,咱們得將其定義爲包含年月以及溫度的WritableBean。
package com.zhaoyi.book.algro.bean; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Date; public class DateTemperature implements Writable { // year month private String yearMonth; // day private String day; // temperature private Integer temperature; // reflect must be need. public DateTemperature() { } public DateTemperature(String yearMonth, String day, Integer temperature) { this.yearMonth = yearMonth; this.day = day; this.temperature = temperature; } public String getYearMonth() { return yearMonth; } public void setYearMonth(String yearMonth) { this.yearMonth = yearMonth; } public String getDay() { return day; } public void setDay(String day) { this.day = day; } public Integer getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } // 序列化接口 public void write(DataOutput out) throws IOException { out.writeUTF(yearMonth); out.writeUTF(day); out.writeInt(temperature); } public void readFields(DataInput in) throws IOException { yearMonth = in.readUTF(); day = in.readUTF(); temperature = in.readInt(); } @Override public String toString() { return yearMonth + ","+ temperature; } }
因爲咱們要將DateTemperature定義爲Mapper的輸出key,所以,咱們還須要定製其排序邏輯,讓mapper階段的shuffle爲咱們自動排序。
按照需求,應該優先按年月進行排序,再而後考慮溫度,即所謂的thenby方式。注意與二次排序的設計模式區分開。
public class DateTemperature implements Writable, WritableComparable<DateTemperature> { ... // 排序 public int compareTo(DateTemperature o) { int result = this.getYearMonth().compareTo(o.getYearMonth()); if(result == 0){ result = this.getTemperature().compareTo(o.getTemperature()); } // 降序排序。若要升序排序,直接返回result. return -1 * result; } }
肯定了Bean以後,咱們就能夠來定製Mapper的邏輯了。代碼以下:
package com.zhaoyi.book.algro.bean; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class TemperatureMapper extends Mapper<LongWritable, Text, DateTemperature, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tokens = line.split(","); String yearMonth = tokens[0] + "-" + tokens[1]; // tokens[2] is Day, it isn't necessary. String day = tokens[2]; Integer temperature = Integer.valueOf(tokens[3]); // k->bean v-temperature context.write(new DateTemperature(yearMonth,tokens[2], temperature),new IntWritable(temperature)); } }
能夠看到,Mapper輸出的是(Bean-溫度)的鍵值對。若是咱們不對輸出的key作任何處理,顯然reduce任務會根據key對象的hash值肯定處理次數(分組)。在這種狀況下,咱們須要將年月相同的記錄聚集到一個reducer進行處理。
要實現該功能,就須要使用自定義的分組比較器。
package com.zhaoyi.book.algro.bean; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TemperatureGroupComparator extends WritableComparator { public TemperatureGroupComparator(){ super(DateTemperature.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { return ((DateTemperature) a).getYearMonth().compareTo(((DateTemperature)b).getYearMonth()); } }
該分組比較器取代mapreduce的默認分組比較行爲(按key的hash),修改成按key中的年月字典序進行比較,這樣,就能夠將他們聚集在一塊兒,統一由一個reducer處理了。
package com.zhaoyi.book.algro.bean; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class TemperatureReducer extends Reducer<DateTemperature, IntWritable,Text, Text> { @Override protected void reduce(DateTemperature key, Iterable<IntWritable> temperatures, Context context) throws IOException, InterruptedException { StringBuilder values = new StringBuilder(); for (IntWritable temperature : temperatures) { values.append(temperature); values.append(","); } // delete the last symbol values.deleteCharAt(values.length() - 1); // output like 2018-01 22,23... context.write(new Text(key.getYearMonth()), new Text(values.toString())); } }
reducer的邏輯比較簡單,將發送到本身的數據進行彙總,按需求的格式進行輸出。注意此處的temperatures,已是排好序的了。
最後,咱們編寫Driver類,編寫常規的mapreduce模板代碼。
package com.zhaoyi.book.algro.bean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.UUID; public class TemperatureDriver { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(TemperatureDriver.class); job.setMapperClass(TemperatureMapper.class); job.setReducerClass(TemperatureReducer.class); job.setMapOutputKeyClass(DateTemperature.class); job.setMapOutputValueClass(IntWritable.class); //job.setPartitionerClass(TemperaturePartition.class); //job.setNumReduceTasks(2); // add your group comparator class. job.setGroupingComparatorClass(TemperatureGroupComparator.class); job.setOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); FileInputFormat.setInputPaths(job,new Path("temperature.txt")); FileOutputFormat.setOutputPath(job,new Path("output_" + UUID.randomUUID()) ); System.exit(job.waitForCompletion(true)? 1:0); } }
最後編寫千篇一概的驅動類,別忘了使用job.setGroupingComparatorClass(TemperatureGroupComparator.class);
往任務中註冊咱們的自定義分組比較器,最後執行代碼,能夠得到一個輸出文件,輸出結果以下所示:
2018-01 12,10,5,3 2017-11 5,0 2017-02 9,3 2016-11 30,20 2016-04 19 2016-03 11 2015-07 21 2015-06 33,22 2015-02 5 2014-07 21,5 2014-06 33,22
第二節咱們快速的搭建了一個解決方案,有一些細節須要特別的分析一下。在這裏,咱們須要對Map輸出的中間鍵——DateTemperature進行分析。
爲了實現二次排序,咱們就須要控制DateTemperature的排序、以及reducer處理鍵的順序。咱們將須要做用的日期以及溫度組合了起來,造成了一個組合鍵,他們之間的關係以下圖所示:
天然值也能夠理解爲鍵值對去除天然鍵以後的剩餘部分。
二次排序的關鍵問題在於,如何找出天然鍵、組合鍵,以及肯定天然值。
本節沒有使用分區功能,全部的數據都發往同一個reducer任務上,好比下面的日誌:
2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce > reduce 2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.Task] - Task 'attempt_local1691658871_0001_r_000000_0' done. 2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Finishing task: attempt_local1691658871_0001_r_000000_0 2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce task executor complete.
都由任務編號爲attempt_local1691658871_0001_r_000000_0的reducer task處理了。
接下來咱們指定多個分區,在每一個分區中,後臺線程按鍵進行排序,最後,最終發往不一樣的reducer進行處理,這很大程度的減輕了單臺機器(若是是集羣)的負擔。
package com.zhaoyi.book.algro.bean; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class TemperaturePartition extends Partitioner<DateTemperature, IntWritable> { @Override public int getPartition(DateTemperature dateTemperature, IntWritable temperature, int numPartitions) { return Math.abs(dateTemperature.getYearMonth().hashCode() % numPartitions); } }
咱們按天然鍵對reducer任務數取餘的結果(絕對值保證不爲負)做爲分區的編號。
job.setNumReduceTasks(2);
這樣,就能夠進行一步優化了,最後彙總的文件會有兩份,分別對應不一樣的分區結果,請留意。
二次排序是一個控制reducer對值進行排序的設計模式。當咱們須要排序的數據不止一列以上,或者須要對值進行排序,那麼能夠考慮這種模式。設計模式以下: