【hadoop】23.MapReduce-shuffle之排序

簡介

排序是MapReduce框架中最重要的操做之一。java

在Mapper和Reducer階段都有涉及,Map Task和Reduce Task均會對數據(按照key)進行排序。該操做屬於Hadoop的默認行爲。任何應用程序中的數據均會被排序,而無論邏輯上是否須要。web

對於Map Task,它會將處理的結果暫時放到一個緩衝區中,當緩衝區使用率達到必定閾值後,再對緩衝區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢後,它會對磁盤上全部文件進行一次合併,以將這些文件合併成一個大的有序文件。apache

對於Reduce Task,它從每一個Map Task上遠程拷貝相應的數據文件,若是文件大小超過必定閾值,則放到磁盤上,不然放到內存中。架構

若是磁盤上文件數目達到必定閾值,則進行一次合併以生成一個更大文件;若是內存中文件大小或者數目超過必定閾值,則進行一次合併後將數據寫到磁盤上。當全部數據拷貝完畢後,Reduce Task統一對內存和磁盤上的全部數據進行一次合併。app

一、排序分類

一、部分排序框架

MapReduce根據輸入記錄的鍵對數據集排序,保證輸出的每一個文件內部排序。webapp

二、全排序ide

如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失了MapReduce所提供的並行架構。函數

全排序的解決方案:oop

  • 首先建立一系列排好序的文件;
  • 其次,串聯這些文件;
  • 最後,生成一個全局排序的文件。

主要思路是使用一個分區來描述輸出的全局排序。例如:能夠爲上述文件建立3個分區,在第一分區中,記錄的單詞首字母a-g,第二分區記錄單詞首字母h-n, 第三分區記錄單詞首字母o-z。

三、輔助排序:(GroupingComparator分組)

Mapreduce框架在記錄到達reducer以前按鍵對記錄排序,但鍵所對應的值並無被排序。甚至在不一樣的執行輪次中,這些值的排序也不固定,由於它們來自不一樣的map任務且這些map任務在不一樣輪次中完成時間各不相同。

通常來講,大多數MapReduce程序會避免讓reduce函數依賴於值的排序。可是,有時也須要經過特定的方法對鍵進行排序和分組等以實現對值的排序。

二、自定義排序

不少時候,默認的排序邏輯並不能知足咱們的需求,例如流量Bean的排序。咱們須要自定義排序邏輯。

原理很簡單:對象實現WritableComparable接口重寫compareTo方法,就能夠實現排序。

public int compareTo(FlowBean o) {
	// 倒序排列,從大到小
	return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

三、排序案例:彙總流量排序輸出

咱們如今有了這樣的需求,須要將手機流量的輸出結果集不按默認的手機號碼序輸出,而是按照彙總流量從高到低輸出。

一、分析

首先要明確的是,排序只能針對key進行,而咱們的彙總流量屬於Val,所以,咱們須要將FlowBean做爲key進行中間排序。須要從新定義一個MapReducer任務。

二、編寫代碼

(1)建立一個新的模塊,phone-flow-sort。

(2)編寫流量Bean,和以前的Bean差很少,只不過實現了排序接口

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable, WritableComparable<FlowBean> {
    ...
    // desc sort.
    public int compareTo(FlowBean other) {
        return this.getTotalFlow()>other.getTotalFlow()? -1:1;
    }
    ...
}

(3)Mapper

package com.zhaoyi.flowbeansort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    FlowBean bean = new FlowBean();
    Text phone = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] strings = line.split("\t");
        // Key-FlowBean: get the up and downFlow.
        bean.set(Long.parseLong(strings[1]), Long.parseLong(strings[2]));
        // Val-Phone
        phone.set(value);
        context.write(bean, phone);
    }
}

須要明確的是,咱們當前的輸入文件即爲phoneflow項目的輸出結果,其結構以下

13480253104	180	180	360
13502468823	7335	110349	117684
13560436666	1116	954	2070
13560439658	2034	5892	7926
13602846565	1938	2910	4848
13660577991	6960	690	7650
13719199419	240	0	240
13726230503	2481	24681	27162
13726238888	2481	24681	27162
13760778710	120	120	240
13826544101	264	0	264
13922314466	3008	3720	6728
13925057413	11058	48243	59301
13926251106	240	0	240
13926435656	132	1512	1644
15013685858	3659	3538	7197
15920133257	3156	2936	6092
15989002119	1938	180	2118
18211575961	1527	2106	3633
18320173382	9531	2412	11943
84138413	4116	1432	5548

所以,相對於當前的Mapper,輸入類型爲行號,Val爲Text,即一行數據。如今,咱們只需將FlowBean做爲Mapper的key輸出,電話號碼做爲val。

而後在reducer中將他們顛倒,便可完成咱們的需求。

(4)Reducer

package com.zhaoyi.flowbeansort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // the values must be only one, and we just need upside down key-val.
        context.write(values.iterator().next(), key);
    }
}

(5)Driver

package com.zhaoyi.flowbeansort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowSortDriver {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(FlowSortDriver.class);

        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class);
       
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);


        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 1:0);

    }
}

因爲Mapper與Reducer的輸出類型不同,所以須要指定Map的輸出k-v類型,不能像以前的那樣省略這裏的代碼了。

配置參數program arg,以以前的項目的輸出做爲輸入。運行項目,將輸出結果生成到output2中。

D:\hadoop\output D:\hadoop\output2

查看結果分區文件

13502468823	7335	110349	117684	7335	110349	117684
13925057413	11058	48243	59301	11058	48243	59301
13726238888	2481	24681	27162	2481	24681	27162
13726230503	2481	24681	27162	2481	24681	27162
18320173382	9531	2412	11943	9531	2412	11943
13560439658	2034	5892	7926	2034	5892	7926
13660577991	6960	690	7650	6960	690	7650
15013685858	3659	3538	7197	3659	3538	7197
13922314466	3008	3720	6728	3008	3720	6728
15920133257	3156	2936	6092	3156	2936	6092
84138413	4116	1432	5548	4116	1432	5548
13602846565	1938	2910	4848	1938	2910	4848
18211575961	1527	2106	3633	1527	2106	3633
15989002119	1938	180	2118	1938	180	2118
13560436666	1116	954	2070	1116	954	2070
13926435656	132	1512	1644	132	1512	1644
13480253104	180	180	360	180	180	360
13826544101	264	0	264	264	0	264
13926251106	240	0	240	240	0	240
13760778710	120	120	240	120	120	240
13719199419	240	0	240	240	0	240

實現了咱們最終的需求。

能夠看到,整個過程咱們其實只是定義了FlowBean的排序邏輯,同時將其做爲key而已(具體來講,是Mapper的輸出key)。

相關文章
相關標籤/搜索