大數據教程(9.1)流量彙總排序的mr實現

    上一章咱們有講到一個mapreduce案例——移動流量排序,若是咱們要將最後的輸出結果按總流量大小逆序輸出,該怎麼實現呢?本節博主將分享這個實現的過程。java

    1、分析apache

           首先,要實現這個功能,咱們可能會想到是否有辦法將輸出的結果先緩存起來,等執行完成後,在排序一塊兒次性所有輸出。是的,這的確是一個能夠實現的思路;centos

           咱們能夠啓動一個reduce來處理,在reduce階段中reduce()方法每次執行時,將key和value緩存到一個TreeMap裏面,而且不執行輸出;當reduce所有切片處理完成後,會調用一個cleanup()方法,且這個方法僅會被調用一次,咱們能夠在這個方法裏面作排序輸出。緩存

           上面的這種方式確實是能夠實現,當是並非很優雅;咱們能夠利用mapreduce自身的map階段輸出key的特性來實現,這個特性就是全部的key會按照key類comparable方法實現的實現去作排序輸出。詳細過程,咱們能夠將整個需求分紅兩個mapreduce過程來執行,第一個mapreduce就和以前的博客中同樣只作統計流量,第二個mapreduce咱們就用key的特性去實現排序。服務器

    2、實現方案(key特性實現方式)app

           FlowBean(流量統計bean類)ide

package com.empire.hadoop.mr.flowsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * 類 FlowBean.java的實現描述:流量統計bean類
 * 
 * @author arron 2018年12月1日 下午10:59:42
 */
public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long dFlow;
    private long sumFlow;

    //反序列化時,須要反射調用空參構造函數,因此要顯示定義一個
    public FlowBean() {
    }

    public FlowBean(long upFlow, long dFlow) {
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.sumFlow = upFlow + dFlow;
    }

    public void set(long upFlow, long dFlow) {
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.sumFlow = upFlow + dFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getdFlow() {
        return dFlow;
    }

    public void setdFlow(long dFlow) {
        this.dFlow = dFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    /**
     * 序列化方法
     */
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dFlow);
        out.writeLong(sumFlow);

    }

    /**
     * 反序列化方法 注意:反序列化的順序跟序列化的順序徹底一致
     */
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dFlow = in.readLong();
        sumFlow = in.readLong();
    }

    public String toString() {

        return upFlow + "\t" + dFlow + "\t" + sumFlow;
    }

    public int compareTo(FlowBean o) {
        return this.sumFlow > o.getSumFlow() ? -1 : 1; //從大到小, 當前對象和要比較的對象比, 若是當前對象大, 返回-1, 交換他們的位置(本身的理解)
    }

}

          FlowCountSort(流量統計後的mapreduce排序實現主類)函數

package com.empire.hadoop.mr.flowsort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954
 * 2070 類 FlowCountSort.java的實現描述:流量排序的mapreduce主實現類
 * 
 * @author arron 2018年12月1日 下午11:00:07
 */
public class FlowCountSort {

    static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

        FlowBean bean = new FlowBean();
        Text     v    = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 拿到的是上一個統計程序的輸出結果,已是各手機號的總流量信息
            String line = value.toString();

            String[] fields = line.split("\t");

            String phoneNbr = fields[0];

            long upFlow = Long.parseLong(fields[1]);
            long dFlow = Long.parseLong(fields[2]);

            bean.set(upFlow, dFlow);
            v.set(phoneNbr);

            context.write(bean, v);

        }

    }

    /**
     * 根據key來掉, 傳過來的是對象, 每一個對象都是不同的, 因此每一個對象都調用一次reduce方法
     * 
     * @author: 張政
     * @date: 2016年4月11日 下午7:08:18
     * @package_name: day07.sample
     */
    static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

        // <bean(),phonenbr>
        @Override
        protected void reduce(FlowBean bean, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            context.write(values.iterator().next(), bean);

        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        /*
         * conf.set("mapreduce.framework.name", "yarn");
         * conf.set("yarn.resoucemanager.hostname", "mini1");
         */
        Job job = Job.getInstance(conf);

        /* job.setJar("/home/hadoop/wc.jar"); */
        //指定本程序的jar包所在的本地路徑
        job.setJarByClass(FlowCountSort.class);

        //指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);

        //指定mapper輸出數據的kv類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //指定最終輸出的數據的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的輸出結果所在目錄

        Path outPath = new Path(args[1]);
        /*
         * FileSystem fs = FileSystem.get(conf); if(fs.exists(outPath)){
         * fs.delete(outPath, true); }
         */
        FileOutputFormat.setOutputPath(job, outPath);

        //將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行
        /* job.submit(); */
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);

    }

}

    3、打包運行       oop

#提交hadoop集羣運行
hadoop jar flowsort_aaron.jar com.empire.hadoop.mr.flowsort.FlowCountSort /user/hadoop/flowcountount    /flowsort
#查看輸出結果目錄
hdfs dfs -ls /flowsort
#瀏覽輸出結果
hdfs dfs -cat /flowsort/part-r-00000

           運行效果圖:大數據

[hadoop@centos-aaron-h1 ~]$ hadoop jar flowsort_aaron.jar com.empire.hadoop.mr.flowsort.FlowCountSort /user/hadoop/flowcountount    /flowsort
18/12/02 07:10:46 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032
18/12/02 07:10:46 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/12/02 07:10:48 INFO input.FileInputFormat: Total input files to process : 1
18/12/02 07:10:48 INFO mapreduce.JobSubmitter: number of splits:1
18/12/02 07:10:48 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/12/02 07:10:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1543705650872_0001
18/12/02 07:10:50 INFO impl.YarnClientImpl: Submitted application application_1543705650872_0001
18/12/02 07:10:50 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1543705650872_0001/
18/12/02 07:10:50 INFO mapreduce.Job: Running job: job_1543705650872_0001
18/12/02 07:11:00 INFO mapreduce.Job: Job job_1543705650872_0001 running in uber mode : false
18/12/02 07:11:00 INFO mapreduce.Job:  map 0% reduce 0%
18/12/02 07:11:10 INFO mapreduce.Job:  map 100% reduce 0%
18/12/02 07:11:23 INFO mapreduce.Job:  map 100% reduce 100%
18/12/02 07:11:23 INFO mapreduce.Job: Job job_1543705650872_0001 completed successfully
18/12/02 07:11:23 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=801
                FILE: Number of bytes written=396695
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=725
                HDFS: Number of bytes written=594
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=6980
                Total time spent by all reduces in occupied slots (ms)=8661
                Total time spent by all map tasks (ms)=6980
                Total time spent by all reduce tasks (ms)=8661
                Total vcore-milliseconds taken by all map tasks=6980
                Total vcore-milliseconds taken by all reduce tasks=8661
                Total megabyte-milliseconds taken by all map tasks=7147520
                Total megabyte-milliseconds taken by all reduce tasks=8868864
        Map-Reduce Framework
                Map input records=21
                Map output records=21
                Map output bytes=753
                Map output materialized bytes=801
                Input split bytes=131
                Combine input records=0
                Combine output records=0
                Reduce input groups=21
                Reduce shuffle bytes=801
                Reduce input records=21
                Reduce output records=21
                Spilled Records=42
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=402
                CPU time spent (ms)=1890
                Physical memory (bytes) snapshot=342441984
                Virtual memory (bytes) snapshot=1694273536
                Total committed heap usage (bytes)=137867264
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=594
        File Output Format Counters 
                Bytes Written=594

           運行結果:

[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /flowsort
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2018-12-02 07:11 /flowsort/_SUCCESS
-rw-r--r--   2 hadoop supergroup        594 2018-12-02 07:11 /flowsort/part-r-00000
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /flowsort/part-r-00000
13502468823     36675   551745  588420
13925057413     55290   241215  296505
13726238888     12405   123405  135810
13726230503     12405   123405  135810
18320173382     47655   12060   59715
13560439658     10170   29460   39630
13660577991     34800   3450    38250
15013685858     18295   17690   35985
13922314466     15040   18600   33640
15920133257     15780   14680   30460
84138413        20580   7160    27740
13602846565     9690    14550   24240
18211575961     7635    10530   18165
15989002119     9690    900     10590
13560436666     5580    4770    10350
13926435656     660     7560    8220
13480253104     900     900     1800
13826544101     1320    0       1320
13926251106     1200    0       1200
13760778710     600     600     1200
13719199419     1200    0       1200

    4、最後總結

           細心的小夥伴們從上的mapreduce主代碼中確定會看出和以前的寫法有所差異,以下圖所示:

        此處咱們以前都是在map方法裏面去申明對象,那麼以前的作法有什麼問題呢?那就是以前的代碼若是在數據不少的時候,咱們在調用map的時候回建立不少個對象,有可能會致使咱們內存溢出。可是,若是們向上面這樣寫,就只建立一個對象就夠了,在map中設置相應的值,然後序列換輸出去,而後依次重複前面的設置動做便可。注意,此處是由於咱們mapreduce會作序列化輸出,因此同一個對象序列化後只需的結果,並不影響。        

        最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。

相關文章
相關標籤/搜索