【大數據分析經常使用算法】1.二次排序

簡介

本算法教程系列創建在您已經有了spark以及Hadoop的開發基礎,若是沒有的話,請觀看本博客的hadoop相關教程或者自行學習,代碼會在博客文檔寫到必定程度統一放到github下。java

二次排序是指在reducer階段對與某個鍵關聯的值進行排序,也叫做值轉換。git

MapReduce框架會自動對映射器生成的鍵進行排序。這說明,在reducer階段準備執行以前,他得到的數據必然是按鍵有序的(而不是值)。傳入各個reducer的值並非有序的,他們的順序咱們沒法肯定(取決於資源調配過程的處理順序)。咱們的實際需求中,對值的排序狀況是很常見的。爲了解決這種問題,咱們須要用到二次排序的設計模式。github

二次排序問題是指在reducer階段對與某個鍵關聯的值進行排序,也叫做值轉換。爲了理解方便,咱們定義通常的MapReduce處理過程的公式以下: $$ map(key_{1},value_{1}) => list(key_{2}, value_{2}) $$ $$ reduce(key_{2},list(value_{2})) => list(key_{3},value_{3}) $$算法

咱們對這兩個公式進行說明:首先,map函數接受一個k1-v1,而後他會輸出任何數量的k2-v2對。接下來,reduce函數接收另外一個k-list(v)做爲輸入,而後將其進行處理,輸出另外一個k-v。apache

顯然reduce函數的輸入list(value_{2})中的{v1,v2,vn....}是一個無序的,二次排序的目的就是讓他們變得有序!所以,咱們能夠根據上面的公式的模式來定義二次排序的公式,以下所示: $$ map(key_{1},value_{1}) => list(key_{2}, value_{2}) $$ $$ sort(V_{1},V_{2}....V_{n}) => ({S_{1},S{2}}...S_{n}) $$ V表明無序變量,S表明有序變量。設計模式

和以前同樣,學以至用。既然理解了二次排序的概念,咱們就來經過一些可以觸類旁通的用例來掌握二次排序的設計模式。app

一、案例

1.一、需求

假定咱們有以下的溫控數據。框架

2018,01,01,10
2018,01,02,5
2018,01,03,3
2018,01,04,12
2016,11,05,20
2016,11,15,30
2016,03,25,11
2016,04,22,19
2015,06,11,22
2015,06,10,33
2015,07,08,21
2015,02,06,5
2017,11,05,5
2017,11,04,0
2017,02,02,3
2017,02,03,9
2014,06,11,22
2014,06,10,33
2014,07,08,21
2014,07,06,5

該數據是按逗號分隔的每一行,分別表明年、月、日以及當天的溫度。用公式表現爲 $$ L_{0}=Year,L_{1}=Month,L_{2}=Day,L_{3}=Temperature,S=',' $$ 其中L表明列的意思,下標表明列因此(從0計數),S表明分隔符號,這裏的分隔符是英文逗號,整個文件以UTF-8編碼(若數據不含中文能夠沒必要在乎)。dom

如今咱們要求處理這段數據,按年月排序倒敘輸出的同時(忽略天氣),統計各個月以內溫度的變化趨勢,一樣按溫度的降序排列,輸出在同一行,下面是輸出範例:ide

2018-01	12,10,5,3
2017-11	5,0
2017-02	9,3
2016-11	30,20
...
...

需求公式以下: $$ V(List(dateAndTemperature)) => S(List(date, S(Listtemparature))) $$ 其中date刪除了日期。

二、項目解決

2.一、Mapper的輸出key

第一步,肯定Map階段生成的key,顯然,咱們得將其定義爲包含年月以及溫度的WritableBean。

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;

public class DateTemperature implements Writable {
    // year month
    private String yearMonth;
    // day
    private String day;
    // temperature
    private Integer temperature;

    // reflect must be need.
    public DateTemperature() {
    }

    public DateTemperature(String yearMonth, String day, Integer temperature) {
        this.yearMonth = yearMonth;
        this.day = day;
        this.temperature = temperature;
    }

    public String getYearMonth() {
        return yearMonth;
    }

    public void setYearMonth(String yearMonth) {
        this.yearMonth = yearMonth;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }

    public Integer getTemperature() {
        return temperature;
    }

    public void setTemperature(Integer temperature) {
        this.temperature = temperature;
    }

    // 序列化接口
    public void write(DataOutput out) throws IOException {
        out.writeUTF(yearMonth);
        out.writeUTF(day);
        out.writeInt(temperature);
    }

    public void readFields(DataInput in) throws IOException {
        yearMonth = in.readUTF();
        day = in.readUTF();
        temperature = in.readInt();
    }

    @Override
    public String toString() {
        return yearMonth + ","+ temperature;
    }
}

因爲咱們要將DateTemperature定義爲Mapper的輸出key,所以,咱們還須要定製其排序邏輯,讓mapper階段的shuffle爲咱們自動排序。

按照需求,應該優先按年月進行排序,再而後考慮溫度,即所謂的thenby方式。注意與二次排序的設計模式區分開。

public class DateTemperature implements Writable, WritableComparable<DateTemperature> {
    ...
        // 排序
    public int compareTo(DateTemperature o) {
        int result = this.getYearMonth().compareTo(o.getYearMonth());
        if(result == 0){
            result = this.getTemperature().compareTo(o.getTemperature());
        }
        // 降序排序。若要升序排序,直接返回result.
        return  -1 * result;
    }

}

2.二、Mapper

肯定了Bean以後,咱們就能夠來定製Mapper的邏輯了。代碼以下:

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TemperatureMapper extends Mapper<LongWritable, Text, DateTemperature, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] tokens = line.split(",");
        String yearMonth = tokens[0] + "-" + tokens[1];
        // tokens[2] is Day, it isn't necessary.
        String day = tokens[2];
        Integer temperature = Integer.valueOf(tokens[3]);
        // k->bean v-temperature
        context.write(new DateTemperature(yearMonth,tokens[2], temperature),new IntWritable(temperature));

    }
}

能夠看到,Mapper輸出的是(Bean-溫度)的鍵值對。若是咱們不對輸出的key作任何處理,顯然reduce任務會根據key對象的hash值肯定處理次數(分組)。在這種狀況下,咱們須要將年月相同的記錄聚集到一個reducer進行處理。

要實現該功能,就須要使用自定義的分組比較器。

2.三、GroupComparator

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TemperatureGroupComparator extends WritableComparator {

    public TemperatureGroupComparator(){
        super(DateTemperature.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        return ((DateTemperature) a).getYearMonth().compareTo(((DateTemperature)b).getYearMonth());
    }
}

該分組比較器取代mapreduce的默認分組比較行爲(按key的hash),修改成按key中的年月字典序進行比較,這樣,就能夠將他們聚集在一塊兒,統一由一個reducer處理了。

2.四、Reducer

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TemperatureReducer extends Reducer<DateTemperature, IntWritable,Text, Text> {
    @Override
    protected void reduce(DateTemperature key, Iterable<IntWritable> temperatures, Context context) throws IOException, InterruptedException {
        StringBuilder values = new StringBuilder();
        for (IntWritable temperature : temperatures) {
            values.append(temperature);
            values.append(",");
        }
        // delete the last symbol
        values.deleteCharAt(values.length() - 1);
        // output like 2018-01 22,23...
        context.write(new Text(key.getYearMonth()), new Text(values.toString()));
    }
}

reducer的邏輯比較簡單,將發送到本身的數據進行彙總,按需求的格式進行輸出。注意此處的temperatures,已是排好序的了。

最後,咱們編寫Driver類,編寫常規的mapreduce模板代碼。

2.五、Driver驅動類

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.UUID;

public class TemperatureDriver {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TemperatureDriver.class);
        job.setMapperClass(TemperatureMapper.class);
        job.setReducerClass(TemperatureReducer.class);

        job.setMapOutputKeyClass(DateTemperature.class);
        job.setMapOutputValueClass(IntWritable.class);

        //job.setPartitionerClass(TemperaturePartition.class);
        //job.setNumReduceTasks(2);

        // add your group comparator class.
        job.setGroupingComparatorClass(TemperatureGroupComparator.class);

        job.setOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path("temperature.txt"));
        FileOutputFormat.setOutputPath(job,new Path("output_" + UUID.randomUUID()) );
        System.exit(job.waitForCompletion(true)? 1:0);

    }
}

最後編寫千篇一概的驅動類,別忘了使用job.setGroupingComparatorClass(TemperatureGroupComparator.class);往任務中註冊咱們的自定義分組比較器,最後執行代碼,能夠得到一個輸出文件,輸出結果以下所示:

2018-01	12,10,5,3
2017-11	5,0
2017-02	9,3
2016-11	30,20
2016-04	19
2016-03	11
2015-07	21
2015-06	33,22
2015-02	5
2014-07	21,5
2014-06	33,22

三、項目分析

第二節咱們快速的搭建了一個解決方案,有一些細節須要特別的分析一下。在這裏,咱們須要對Map輸出的中間鍵——DateTemperature進行分析。

爲了實現二次排序,咱們就須要控制DateTemperature的排序、以及reducer處理鍵的順序。咱們將須要做用的日期以及溫度組合了起來,造成了一個組合鍵,他們之間的關係以下圖所示:

天然值也能夠理解爲鍵值對去除天然鍵以後的剩餘部分。

二次排序的關鍵問題在於,如何找出天然鍵、組合鍵,以及肯定天然值。

四、優化

本節沒有使用分區功能,全部的數據都發往同一個reducer任務上,好比下面的日誌:

2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce > reduce
2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.Task] - Task 'attempt_local1691658871_0001_r_000000_0' done.
2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Finishing task: attempt_local1691658871_0001_r_000000_0
2019-01-14 11:54:08,395 INFO [org.apache.hadoop.mapred.LocalJobRunner] - reduce task executor complete.

都由任務編號爲attempt_local1691658871_0001_r_000000_0的reducer task處理了。

接下來咱們指定多個分區,在每一個分區中,後臺線程按鍵進行排序,最後,最終發往不一樣的reducer進行處理,這很大程度的減輕了單臺機器(若是是集羣)的負擔。

4.一、定義分區器

package com.zhaoyi.book.algro.bean;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class TemperaturePartition extends Partitioner<DateTemperature, IntWritable> {
    @Override
    public int getPartition(DateTemperature dateTemperature, IntWritable temperature, int numPartitions) {
        return Math.abs(dateTemperature.getYearMonth().hashCode() % numPartitions);
    }
}

咱們按天然鍵對reducer任務數取餘的結果(絕對值保證不爲負)做爲分區的編號。

4.二、驅動器註冊分區器並設置分區數

job.setNumReduceTasks(2);

這樣,就能夠進行一步優化了,最後彙總的文件會有兩份,分別對應不一樣的分區結果,請留意。

六、總結

二次排序是一個控制reducer對值進行排序的設計模式。當咱們須要排序的數據不止一列以上,或者須要對值進行排序,那麼能夠考慮這種模式。設計模式以下:

  • 肯定組合鍵,將組合鍵做爲屢次排序的標準
  • 肯定天然鍵,經過分組比較器控制天然鍵相同的數據聚集到同一個reducer進行處理;
  • 天然值的處理放在reducer中按業務需求肯定,整體來講,和二次排序的關鍵邏輯並沒有多大關係。
相關文章
相關標籤/搜索