【hadoop】25.MapReduce-shuffle之分組

簡介

shuffle機制中的分組排序(Group)是一個創建在Reducer階段的處理過程。參看下圖的第15步驟。經過這一步驟,咱們能夠修改Reducer斷定key的邏輯,按照咱們的業務邏輯去定義那些key應該屬於同一類型的分組,從而決定那些數據走向同一個reducer。java

須要注意的是,現實開發中常常使用的分組其實就是分區功能,本節講述的是Reducer階段根據key分組的過程。apache

默認狀況下MapReduce的分組階段會根據咱們提供的key進行排序,而後將排序結果相等的放到一次reducer循環中(代碼上的體現)。而該排序過程,其實能夠由咱們去定義,也就說,將排序的結果咱們須要自定義一個GroupComparator.app

一、探究GroupComparator

咱們能夠經過輸出日誌查看當前的分組數ide

...
Reduce input groups=3
...

例如,上述狀況下分組爲3。oop

若是咱們不特殊指定分組,那麼分組數會按照key的對應WritableComparator的實現類邏輯進行排序,並按照key進行分組排序,同一組key的數據會進入reducer方法中進行處理,一組執行一次。this

以wordCount舉例,在不考慮自定義分區(partition)的狀況下(如今是默認一個分區)。咱們reducer輸入的key值是Text類型,那麼他的key排序邏輯就應該是Text的Comparator。參見源碼:編碼

@InterfaceAudience.Public
@InterfaceStability.Stable
public class Text extends BinaryComparable

BinaryComparable,即按照字典順序進行排序。也就是說,相同的JAVA類型的String(String對應Hadoop的序列化Text類型)Key會被劃分爲一個分組,咱們來驗證一下這個想法,例如以下的輸入數據日誌

about
about
areya
akuya
mywife

按照咱們以前的理論,分組數應該是4,即兩個about會被劃分爲一組,執行一次處理。運行,查看分組日誌:code

...
Reduce input groups=4
...

就是這樣。orm

接下來,咱們探討一下如何改變分組數。其實答案顯而易見,那就是定義Key類型的Comparator邏輯,實現本身的排序行爲。

咱們來看看Hadoop類型組件中使用的Comparator有哪些,每一個類型都有本身的Comparator實現,咱們能夠進入源碼一一查看。

爲了練習分組排序,接下來咱們運行一個案例。

一、案例:訂單號分組排序

需求:獲取每種訂單號中消費最多的一條記錄,並按訂單號的字典序排序

輸入數據:第一列爲訂單號,第二列爲雜項,第三列爲消費值。

Order_0000001	Pdt_01	222.8
Order_0000002	Pdt_05	722.4
Order_0000001	Pdt_05	25.8
Order_0000003	Pdt_01	222.8
Order_0000003	Pdt_01	33.8
Order_0000002	Pdt_03	522.8
Order_0000002	Pdt_04	122.4
Order_0000002	Pdt_04	1220.4
Order_0000002	Pdt_04	1422.4
Order_0000002	Pdt_04	1522.4
Order_0000002	Pdt_04	1622.4
Order_0000001	Pdt_04	1000.4

1.一、分析

一、首先篩選掉雜項Pdt_01

二、定義訂單Bean,將其做爲Key並先根據id排序,再根據消費進行排序(value爲空值);

難題出現了,咱們的key爲訂單Bean,以他做爲key的話,咱們沒法關聯到Order_0000001的第一條數據是咱們想要的,怎麼作?顯然,咱們須要將訂單id相同的一組訂單數據放在一塊兒,這樣咱們只須要取第一條數據就能夠了。

也就是要欺騙reducer,不要以Bean對象做爲參考,而已bean對象的orderId做爲參考。

問題迎刃而解,接下來,開始編碼。

也能夠考慮結果按訂單號進行分區,實現分組效果,這是業務開發中經常使用的方式。同時,該問題的解決方案比較極端,還有更多優雅的處理方式能夠選擇,這裏只是爲了演示reducer的迭代邏輯,所以行此下策。

1.二、實現

(1) Bean

package com.zhaoyi.order;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private String orderId;

    private double price;

    // second order
    public int compareTo(OrderBean o) {
        // according to order id.
        int result = this.orderId.compareTo(o.getOrderId());

        // then by price
        if(result == 0){
            result = this.getPrice() > o.getPrice()? -1:1;
        }

        return result;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);
    }

    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();
    }

    public OrderBean() {
    }

    public void set(String orderId, double price){
        this.orderId = orderId;
        this.price = price;
    }

    public OrderBean(String orderId, double price) {
        this.orderId = orderId;
        this.price = price;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return orderId + "\t" + price ;
    }
}

(2)Mapper

package com.zhaoyi.order;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

    OrderBean orderBean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] strings = line.split("\t");

        orderBean.set(strings[0], Double.parseDouble(strings[2]));

        context.write(orderBean, NullWritable.get());
    }
}

(3)Reducer

package com.zhaoyi.order;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReducer extends Reducer< OrderBean, NullWritable, OrderBean, NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 注意此處迭代的代碼,每當咱們迭代一次,
        // key的值編寫切換到下一個分組的key值,是什麼呢?您能夠嘗試一下就知道了。
        // System.out.println("------------");
        // System.out.print(key.getOrderId());
        // int count = 0;
        // for (NullWritable value:values) {
        //     count++;
        // }
        // System.out.println("擁有"+ count + "條數據.");
        context.write(key, NullWritable.get());
    }
}

(4)GroupComparator

package com.zhaoyi.order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupComparator extends WritableComparator {

    public OrderGroupComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean aa = (OrderBean) a;
        OrderBean bb = (OrderBean) b;
        return aa.getOrderId().compareTo(bb.getOrderId());
    }
}

(5)驅動類,在此處指定分組排序類

package com.zhaoyi.order;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {
    public static void main(String[] args) throws Exception{
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(OrderDriver.class);
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);

        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 設置自定義分組排序類
        job.setGroupingComparatorClass(OrderGroupComparator.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 1:0);
    }
}

運行代碼,返回結果:

Order_0000001	1000.4
Order_0000002	1622.4
Order_0000003	222.8

在reducer中若是進行迭代,會更新key值,請注意。

相關文章
相關標籤/搜索