【hadoop】22.MapReduce-shuffle之分區

簡介

分區(partition),屬於Mapper階段的流程。前面提到,線程首先根據數據最終要傳的reducer把數據分紅相應的分區(partition)。java

默認狀況下,採起的分區類是HashPartitionergit

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

顯然,默認分區是根據key的hashCode對reduceTasks個數取模獲得的。用戶無法控制哪一個key存儲到哪一個分區,所以咱們經常須要自定義分區類。github

編寫自定義分區並應用的流程大體分爲3步。apache

  1. 自定義類繼承Partitioner,重寫getPartition()方法;
  2. 在job驅動類中,設置自定義partitioner;
  3. 自定義partition後,要根據自定義partitioner的邏輯設置相應數量的reduce task。

爲何須要設置reduce task的數量?app

  • 若是reduceTask的數量 > getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
  • 若是1 < reduceTask的數量 < getPartition的結果數,則有一部分分區數據無處安放,會Exception;
  • 若是reduceTask的數量 = 1,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;

例如:假設自定義分區數爲5,則ide

  • job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
  • job.setNumReduceTasks(2);會報錯
  • job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件;

一、分區案例——歸屬地分區電話號碼流量統計

接下來咱們完善以前的電話號碼手機流量統計項目。oop

完善需求:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件。this

根據手機號的前三位能夠區分不一樣歸宿地的電話號碼。線程

一、數據準備:依然是以前的輸入文件phone_data.txt;code

二、分析

Mapreduce中會將map輸出的kv對,按照相同key分組,而後分發給不一樣的reducetask。默認的分發規則爲:根據key的hashcode%reducetask數來分發。

若是要按照咱們本身的需求進行分組,則須要改寫數據分發(分組)組件Partitioner 自定義一個CustomPartitioner繼承抽象類:Partitioner

最後再從job驅動中,設置自定義partitioner(以前咱們使用默認的): job.setPartitionerClass(CustomPartitioner.class)

三、編寫代碼

(1)、在以前的項目中添加分區類

package com.zhaoyi.phoneflow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowPartition extends Partitioner<Text, FlowBean> {
    public static final String PARTITION_136 = "136";
    public static final String PARTITION_137 = "137";
    public static final String PARTITION_138 = "138";
    public static final String PARTITION_139 = "139";
    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        // default partition is 0.
        int partition = 0;

        // get the phone left 3 number.
        String phonePrefix = text.toString().substring(0,3);

        // get partition.
        if(PARTITION_136.equals(phonePrefix)){
            partition = 1;
        }else if(PARTITION_137.equals(phonePrefix)){
            partition = 2;
        }else if(PARTITION_138.equals(phonePrefix)) {
            partition = 3;
        }else if(PARTITION_139.equals(phonePrefix)) {
            partition = 4;
        }
        return partition;
    }
}

(2)、在驅動類中指定自定義分區類

package com.zhaoyi.phoneflow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {
    public static void main(String[] args) throws Exception {
        if(args.length != 2){
            System.out.println("Please enter the parameter: data input and output paths...");
            System.exit(-1);
        }
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(FlowDriver.class);

        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 設置分區類
        job.setPartitionerClass(FlowPartition.class);
        // 咱們設置了5個分區,對應上。
        job.setNumReduceTasks(5);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 1:0);
    }
}

注意分區返回數字是從0開始計數。

以前咱們未指定分區類的時候,生成一個分區結果文件。這一次就會生成5個分區文件了,分別對應不一樣地區的電話號碼記錄。

本案例的具體代碼參見github項目phoneflow模塊。

二、分區案例 —— wordcount大小寫分區

既然改造了電話號碼案例,咱們也來改造一下wordcount案例,將單詞統計根據首字母大小寫不一樣輸出爲2個文件。即大寫字母開頭的輸出爲一個文件,小寫字母開頭的輸出爲1個文件。

一、分析

只需在原有項目的基礎上,添加分區類:獲取每一個單詞的首字母,直接使用JAVA Character類API判斷是否是小寫字母,若不是,則統一斷定爲大寫字母。

二、編寫代碼,添加分區類

package com.zhaoyi.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartition extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        char firstLetter = text.toString().charAt(0);
        if(Character.isLowerCase(firstLetter)){
            return 1;
        }
        return 0;
    }
}

別忘了在驅動類中設置自定義分區類。

三、查看輸出結果

分區文件有2個,分別表明大小寫字母的統計結果

#### part-r-00000
Alice	2
So	1
`and	1
`without	1

#### part-r-00001
a	3
and	1
bank	1
......
......
the	3
this	1
......
......
was	3
what	1

注意省略號表明我刪除掉的一些記錄,只是爲了限制篇幅,不屬於文件內容。

本案例見github項目的word-count模塊。

相關文章
相關標籤/搜索