shuffle機制中的分組排序(Group)是一個創建在Reducer階段的處理過程。參看下圖的第15步驟。經過這一步驟,咱們能夠修改Reducer斷定key的邏輯,按照咱們的業務邏輯去定義那些key應該屬於同一類型的分組,從而決定那些數據走向同一個reducer。java
須要注意的是,現實開發中常常使用的分組其實就是分區功能,本節講述的是Reducer階段根據key分組的過程。apache
默認狀況下MapReduce的分組階段會根據咱們提供的key進行排序,而後將排序結果相等的放到一次reducer循環中(代碼上的體現)。而該排序過程,其實能夠由咱們去定義,也就說,將排序的結果咱們須要自定義一個GroupComparator.app
咱們能夠經過輸出日誌查看當前的分組數ide
... Reduce input groups=3 ...
例如,上述狀況下分組爲3。oop
若是咱們不特殊指定分組,那麼分組數會按照key的對應WritableComparator的實現類邏輯進行排序,並按照key進行分組排序,同一組key的數據會進入reducer方法中進行處理,一組執行一次。this
以wordCount舉例,在不考慮自定義分區(partition)的狀況下(如今是默認一個分區)。咱們reducer輸入的key值是Text類型,那麼他的key排序邏輯就應該是Text的Comparator。參見源碼:編碼
@InterfaceAudience.Public @InterfaceStability.Stable public class Text extends BinaryComparable
BinaryComparable
,即按照字典順序進行排序。也就是說,相同的JAVA類型的String(String對應Hadoop的序列化Text類型)Key會被劃分爲一個分組,咱們來驗證一下這個想法,例如以下的輸入數據日誌
about about areya akuya mywife
按照咱們以前的理論,分組數應該是4,即兩個about會被劃分爲一組,執行一次處理。運行,查看分組日誌:code
... Reduce input groups=4 ...
就是這樣。orm
接下來,咱們探討一下如何改變分組數。其實答案顯而易見,那就是定義Key類型的Comparator邏輯,實現本身的排序行爲。
咱們來看看Hadoop類型組件中使用的Comparator有哪些,每一個類型都有本身的Comparator實現,咱們能夠進入源碼一一查看。
爲了練習分組排序,接下來咱們運行一個案例。
需求:獲取每種訂單號中消費最多的一條記錄,並按訂單號的字典序排序
輸入數據:第一列爲訂單號,第二列爲雜項,第三列爲消費值。
Order_0000001 Pdt_01 222.8 Order_0000002 Pdt_05 722.4 Order_0000001 Pdt_05 25.8 Order_0000003 Pdt_01 222.8 Order_0000003 Pdt_01 33.8 Order_0000002 Pdt_03 522.8 Order_0000002 Pdt_04 122.4 Order_0000002 Pdt_04 1220.4 Order_0000002 Pdt_04 1422.4 Order_0000002 Pdt_04 1522.4 Order_0000002 Pdt_04 1622.4 Order_0000001 Pdt_04 1000.4
一、首先篩選掉雜項Pdt_01
;
二、定義訂單Bean,將其做爲Key並先根據id排序,再根據消費進行排序(value爲空值);
難題出現了,咱們的key爲訂單Bean,以他做爲key的話,咱們沒法關聯到Order_0000001的第一條數據是咱們想要的,怎麼作?顯然,咱們須要將訂單id相同的一組訂單數據放在一塊兒,這樣咱們只須要取第一條數據就能夠了。
也就是要欺騙reducer,不要以Bean對象做爲參考,而已bean對象的orderId做爲參考。
問題迎刃而解,接下來,開始編碼。
也能夠考慮結果按訂單號進行分區,實現分組效果,這是業務開發中經常使用的方式。同時,該問題的解決方案比較極端,還有更多優雅的處理方式能夠選擇,這裏只是爲了演示reducer的迭代邏輯,所以行此下策。
(1) Bean
package com.zhaoyi.order; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable<OrderBean> { private String orderId; private double price; // second order public int compareTo(OrderBean o) { // according to order id. int result = this.orderId.compareTo(o.getOrderId()); // then by price if(result == 0){ result = this.getPrice() > o.getPrice()? -1:1; } return result; } public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeDouble(price); } public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.price = in.readDouble(); } public OrderBean() { } public void set(String orderId, double price){ this.orderId = orderId; this.price = price; } public OrderBean(String orderId, double price) { this.orderId = orderId; this.price = price; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public String toString() { return orderId + "\t" + price ; } }
(2)Mapper
package com.zhaoyi.order; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean orderBean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] strings = line.split("\t"); orderBean.set(strings[0], Double.parseDouble(strings[2])); context.write(orderBean, NullWritable.get()); } }
(3)Reducer
package com.zhaoyi.order; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OrderReducer extends Reducer< OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 注意此處迭代的代碼,每當咱們迭代一次, // key的值編寫切換到下一個分組的key值,是什麼呢?您能夠嘗試一下就知道了。 // System.out.println("------------"); // System.out.print(key.getOrderId()); // int count = 0; // for (NullWritable value:values) { // count++; // } // System.out.println("擁有"+ count + "條數據."); context.write(key, NullWritable.get()); } }
(4)GroupComparator
package com.zhaoyi.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupComparator extends WritableComparator { public OrderGroupComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aa = (OrderBean) a; OrderBean bb = (OrderBean) b; return aa.getOrderId().compareTo(bb.getOrderId()); } }
(5)驅動類,在此處指定分組排序類
package com.zhaoyi.order; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderDriver { public static void main(String[] args) throws Exception{ Job job = Job.getInstance(new Configuration()); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 設置自定義分組排序類 job.setGroupingComparatorClass(OrderGroupComparator.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 1:0); } }
運行代碼,返回結果:
Order_0000001 1000.4 Order_0000002 1622.4 Order_0000003 222.8
在reducer中若是進行迭代,會更新key值,請注意。