大數據系列之分佈式計算批處理引擎MapReduce實踐-排序

清明剛過,該來學習點新的知識點了。html

上次說到關於MapReduce對於文本中詞頻的統計使用WordCount。若是還有同窗不熟悉的能夠參考博文大數據系列之分佈式計算批處理引擎MapReduce實踐java

博文發表後不少同窗私下反映對於MapReduce的處理原理沒有了解到。在這篇博文中樓主與你們交流下MapReduce的數據處理原理及MR中各角色的職責。git

文末還有示例代碼講解。。github

1.MapReduce中的數據流動

  • 最簡單的過程: map - reduce
  • 定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce
  • 增長了在本地先進行一次reduce(優化)的過程: map - combine - partition - reduce

2.Partition的概念和使用

獲得map產生的記錄後,他們該分配給哪些reducer來處理呢?hadoop默認是根據散列值來派發,可是實際中,這並不能很高效或者按照咱們要求的去執行任務。例如,通過partition處理後,一個節點的reducer分配到了20條記錄,另外一個卻分配到了10W萬條,試想,這種狀況效率如何。又或者,咱們想要處理後獲得的文件按照必定的規律進行輸出,假設有兩個reducer,咱們想要最終結果中part-00000中存儲的是」h」開頭的記錄的結果,part-00001中存儲其餘開頭的結果,這些默認的partitioner是作不到的。因此須要咱們本身定製partition來選擇reducer。自定義partitioner很簡單,只要自定義一個類,而且繼承Partitioner類,重寫其getPartition方法就行了,在使用的時候經過調用Job的setPartitionerClass指定一下便可。apache

3.MapReduce基於key的全排序的原理

如何使用mapreduce來作全排序?最簡單的方法就是使用一個partition,由於一個partition對應一個reduce的task,然而reduce的輸入原本就是對key有序的,因此很天然地就產生了一個全排序文件。可是這種方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失了mapreduce所提供的並行架構的優點。數據結構

若是是分多個partition呢,則只要確保partition是有序的就好了。首先建立一系列排好序的文件;其次,串聯這些文件(相似於歸併排序);最後獲得一個全局有序的文件。好比有1000個1-10000的數據,跑10個ruduce任務,若是進行partition的時候,可以將在1-1000中數據的分配到第一個reduce中,1001-2000的數據分配到第二個reduce中,以此類推。即第n個reduce所分配到的數據所有大於第n-1個reduce中的數據。這樣,每一個reduce出來以後都是有序的了,咱們只要concat全部的輸出文件,變成一個大的文件,就都是有序的了。架構

這時候可能會有一個疑問,雖然各個reduce的數據是按照區間排列好的,可是每一個reduce裏面的數據是亂序的啊?固然不會,不要忘了排序是MapReduce的自然特性 — 在數據達到reducer以前,mapreduce框架已經對這些數據按key排序了。app

可是這裏又有另一個問題,就是在定義每一個partition的邊界的時候,可能會致使每一個partition上分配到的記錄數相差很大,這樣數據最多的partition就會拖慢整個系統。咱們指望的是每一個partition上分配的數據量基本相同,hadoop提供了採樣器幫咱們預估整個邊界,以使數據的分配儘可能平均。框架

在Hadoop中,patition咱們能夠用TotalOrderPartitioner替換默認的分區,而後將採樣的結果傳給他,就能夠實現咱們想要的分區。在採樣時,可使用hadoop的幾種採樣工具,如RandomSampler,InputSampler,IntervalSampler。dom

關於上述過程,在《Hadoop權威指南》中有具體的講解,其中一張圖能夠幫助咱們更好地理解在排序操做中hadoop在map和reduce階段所作的事:

以上文字取自網上某些博文內容。。僅供參考。。。

4.下面介紹下一個代碼示例。

MapReduceExample

一組數據按照年齡分區,區內按照成績倒序排序
#數據內容見data/person.csv,以下
#編號,姓名,年齡,性別,成績
1,Alice,23,female,45
2,Bob,34,male,89
3,Chris,67,male,97
4,Kristine,38,female,53
5,Connor,25,male,27
6,Daniel,78,male,95
7,James,34,male,79
8,Alex,52,male,69
9,Nancy,7,female,98
10,Adam,9,male,37
11,Jacob,7,male,23
12,Mary,6,female,93
13,Clara,87,female,72
14,Monica,56,female,92
#項目要求#
1.將數據按照年齡段分區
0至20歲爲第一區,
20至50歲爲第二區,
50以上爲第三區。
2.將各區的數據按照分數倒序排序輸出
#輸出結果以下#
分區1:part-r-00000
9,Nancy,female,7       98
12,Mary,female,6       93
10,Adam,male,9 37
11,Jacob,male,7 23
分區2:part-r-00001
2,Bob,male,34   89
7,James,male,34 79
4,Kristine,female,38   53
1,Alice,female,23       45
5,Connor,male,25       27
分區3:part-r-00002
3,Chris,male,67 97
6,Daniel,male,78       95
14,Monica,female,56     92
13,Clara,female,87     72
8,Alex,male,52 69

   4.1 解決思路

  描述一下MapReduce處理數據的大概簡單流程:首先,MapReduce框架經過getSplit方法實現對原始文件的切片以後,每個切片對應着一個map task,inputSplit輸入到Map函數進行處理,中間結果通過環形緩衝區的排序compareTo(T),而後分區、自定義二次排序(若是有的話)和合並,再經過shuffle操做將數據傳輸到reduce task端,reduce端也存在着緩衝區,數據也會在緩衝區和磁盤中進行合併排序等操做,而後對數據按照Key值進行分組,而後每次處理完一個分組以後就會去調用一次reduce函數,最終輸出結果。

   4.2 具體解決思路

    A.Map端處理:單行數據拆分,對於拆分後的數據按成績分數進行排序,MapReduce框架不論是默認排序或者是自定義排序都只是對Key值進行排序,如今的狀況是這些數據不是key值,怎麼辦?其實咱們能夠將原始數據的Key值和其對應的數據組合成一個新的Key值,而後新的Key值對應的仍是以前的數字。那麼咱們就能夠將原始數據的map輸出變成相似下面的數據結構:

{[1,Alice,23,female], 45}
{[2,Bob,34,male], 89}
{[3,Chris,67,male], 97}
{[4,Kristine,38,female],53}
{[5,Connor,25,male],27}
{[6,Daniel,78,male],95}
{[7,James,34,male],79}
{[8,Alex,52,male],69}
{[9,Nancy,7,female],98}
{[10,Adam,9,male],37}
{[11,Jacob,7,male],23}
{[12,Mary,6,female],93}
{[13,Clara,87,female],72}
{[14,Monica,56,female],92}

    B.Partition分區操做:項目要求按照年齡進行分區,這裏咱們須要自定義一個分區處理器,由於個人目標不是想將全部的數據傳到同一個reduce中,而是想將年齡分區後的數據放在同一個reduce中進行分組合並,因此咱們須要根據新key值中的第三個字段來自定義一個分區處理器。經過分區操做後,獲得的數據流以下:    

partition1: 0~20
{[9,Nancy,7,female],98}
{[10,Adam,9,male],37}
{[11,Jacob,7,male],23}
{[12,Mary,6,female],93}

partition2:20~50
 {[1,Alice,23,female], 45}
{[2,Bob,34,male], 89}
{[4,Kristine,38,female],53}
{[5,Connor,25,male],27}
{[7,James,34,male],79}

partition3:50~
{[3,Chris,67,male], 97}
{[6,Daniel,78,male],95}
{[8,Alex,52,male],69}
{[13,Clara,87,female],72}
{[14,Monica,56,female],92}

   C.自定義排序操做:分區操做完成以後,我調用本身的自定義排序器對新的Key值按照成績分數進行排序。 排序後的數據流結果以下:

partition1: 0~20
{[9,Nancy,7,female],98}
{[12,Mary,6,female],93}
{[10,Adam,9,male],37}
{[11,Jacob,7,male],23}

partition2:20~50
{[2,Bob,34,male], 89}
{[7,James,34,male],79}
{[4,Kristine,38,female],53}
{[1,Alice,23,female], 45}
{[5,Connor,25,male],27}

partition3:50~
{[3,Chris,67,male], 97}
{[6,Daniel,78,male],95}
{[14,Monica,56,female],92}
{[13,Clara,87,female],72}
{[8,Alex,52,male],69}

   D.Reducer操做:通過Shuffle處理以後,數據傳輸到Reducer端輸出。

  4.2 代碼

   A.代碼結構以下

  pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>mapReduceDemo</groupId>
    <artifactId>mapReduceDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-client</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <excludeScope>provided</excludeScope>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

  B.Main.java  入口

package com.m.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 操做person.csv文件
 */
public class Main extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: AgePartition <input> <output>");
            ToolRunner.printGenericCommandUsage(System.out);
            System.exit(2);
        }

        Configuration conf = getConf();
        //conf.set(RegexMapper.GROUP,"female");
        Job job = Job.getInstance(conf);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path output = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(output)) {
            fs.delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output);
        job.setJarByClass(Main.class);
        job.setMapperClass(DefinedMap.class);
        //設置map的輸出key和value類型
        job.setMapOutputKeyClass(DefinedCombinationKey.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(DefinedReducer.class);
        //設置reduce的輸出key和value類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //自定義分區策略
        job.setPartitionerClass(DefinedPartitioner.class);
        //自定義排序策略,在自定義組合鍵重寫方法compareTo時若自定義排序策略與之相同能夠省略自定義排序策略。最終結果以自定義排序策略爲主
        job.setSortComparatorClass(DefinedSort.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setNumReduceTasks(3);//reducer num  = partition num

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Main(), args);
        System.exit(res);
    }
}

   C. DefinedMap.java  {自定義分區器 class DefinedPartitioner,class Map, class Reducer}

package com.m.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * @author mengfanzhu
 * @Package com.m.mr
 * @Description: 自定義 map處理
 * @date 17/4/7 14:04
 */
public class DefinedMap extends Mapper<Object, Text, DefinedCombinationKey, IntWritable> {
    DefinedCombinationKey combinationKey=new DefinedCombinationKey();
    Text sortName = new Text();
    IntWritable score = new IntWritable();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // no,name, age, gender, score
        String[] arr = value.toString().split(",");
        score.set(Integer.parseInt(arr[4]));
        sortName.set(arr[0]+","+arr[1]+","+arr[3]+","+arr[2]);
        combinationKey.setFirstKey(sortName);
        combinationKey.setSecondKey(score);
        context.write(combinationKey, score);
    }
}

/**
 * 自定義分區 按照年齡段分區
 */
class DefinedPartitioner extends Partitioner<DefinedCombinationKey,IntWritable> {
    @Override
    public int getPartition(DefinedCombinationKey key, IntWritable value, int n) {
        if (n == 0) {
            return 0;
        }
        String[] arr = key.getFirstKey().toString().split(",");
        int age = Integer.parseInt(arr[3]);
        if (age <= 20) {
            return 0;
        } else if (age <= 50) {
            return 1 % n;
        } else {
            return 2 % n;
        }
    }
}
/**
 * 自定義reducer處理
 */
class DefinedReducer extends Reducer<DefinedCombinationKey, IntWritable, Text, Text> {

    StringBuffer sb=new StringBuffer();
    Text sore=new Text();

    @Override
    protected void reduce(DefinedCombinationKey key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        sb.delete(0, sb.length());
        Iterator<IntWritable> it=values.iterator();

        while (it.hasNext()) {
            sb.append(it.next()+",");
        }

        if (sb.length()>0) {
            sb.deleteCharAt(sb.length()-1);
        }
        sore.set(sb.toString());
        context.write(key.getFirstKey(),sore);
    }
}

 

     D . 自定義二次排序策略 DefinedSort

package com.m.mr;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author mengfanzhu
 * @Description: 自定義排序策略
 * @date 17/4/7 13:04
 */
public class DefinedSort extends WritableComparator {
    private static final Logger logger = LoggerFactory.getLogger(DefinedSort.class);
    public DefinedSort() {
        super(DefinedCombinationKey.class,true);
    }
    @Override
    public int compare(WritableComparable combinationKeyOne,
                       WritableComparable CombinationKeyOther) {
        logger.info("---------enter DefinedComparator flag---------");

        DefinedCombinationKey c1 = (DefinedCombinationKey) combinationKeyOne;
        DefinedCombinationKey c2 = (DefinedCombinationKey) CombinationKeyOther;

        logger.info("---------out DefinedComparator flag---------");
        return c2.getSecondKey().get()-c1.getSecondKey().get();//0,負數,正數
    }
}

   E.  DefinedCombinationKey.java 自定義組合鍵 

package com.m.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author mengfanzhu
 * @Description:
 * @date 17/4/7 13:01
 */
public class DefinedCombinationKey implements WritableComparable<DefinedCombinationKey> {
    private static final Logger logger = LoggerFactory.getLogger(DefinedCombinationKey.class);
    private Text firstKey;
    private IntWritable secondKey;
    public DefinedCombinationKey() {
        this.firstKey = new Text();
        this.secondKey = new IntWritable();
    }
    public Text getFirstKey() {
        return this.firstKey;
    }
    public void setFirstKey(Text firstKey) {
        this.firstKey = firstKey;
    }
    public IntWritable getSecondKey() {
        return this.secondKey;
    }
    public void setSecondKey(IntWritable secondKey) {
        this.secondKey = secondKey;
    }
    public void readFields(DataInput dateInput) throws IOException {
        // TODO Auto-generated method stub
        this.firstKey.readFields(dateInput);
        this.secondKey.readFields(dateInput);
    }
    public void write(DataOutput outPut) throws IOException {
        this.firstKey.write(outPut);
        this.secondKey.write(outPut);
    }
    /**
     * 自定義比較策略
     * 注意:該比較策略用於mapreduce的第一次默認排序,也就是發生在map階段的sort小階段,
     * 發生地點爲環形緩衝區(能夠經過io.sort.mb進行大小調整)
     */
    public int compareTo(DefinedCombinationKey definedCombinationKey) {
        logger.info("-------CombinationKey flag-------");
        return this.secondKey.compareTo(definedCombinationKey.getSecondKey()); } }

說明:
1.在自定義組合鍵的時候,咱們須要特別注意,必定要實現WritableComparable接口,而且實現compareTo方法的比較策略。這個用於mapreduce的第一次默認排序,也就是發生在map階段的sort小階段,發生地點爲環形緩衝區(能夠經過io.sort.mb進行大小調整),可是其對咱們最終的二次排序結果是沒有影響的。咱們二次排序的最終結果是由咱們的自定義比較器決定的。  
2.在此示例代碼中寫了自定義組合鍵的compareTo對於score進行正序,在自定義比較器中對score進行倒序。用來分析MR的工做原理。

 

 F.打包運行。maven :mvn clean package 

 hadoop jar mapReduceDemo-1.0-SNAPSHOT.jar  com.m.mr.Main /person.csv /out     

 

完~

數據及代碼包見

代碼示例已上傳至GitHub,https://github.com/fzmeng/MapReduceExample

相關文章
相關標籤/搜索