【hadoop】19.MapReduce-手機流量統計

簡介

從本章節您能夠學習到:手機流量統計案例。java

接下來咱們有一系列的分析文件,以下構造所示:git

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
...

需求:統計手機號耗費的總上行流量、下行流量、總流量(序列化)github

這裏沒有詳細的告訴咱們具體一列是什麼,只能憑直覺去定義了。但這並不影響咱們的操做。同時須要瞭解到的是,現實生活中的數據甚至比這個亂的多,因此,數據挖掘又是咱們鑽研大數據不得不學習的另外一個領域。apache

一、需求分析

很顯然,咱們須要的是電話號碼(列2)、上行流量、下行流量數據以及將上下行流量彙總的總流量數據。windows

一、咱們須要什麼?app

這時候用於序列化傳輸的基本類型就不能知足咱們的數據傳輸需求了,所以,咱們須要自定義bean序列化類型。ide

二、數據如何分割?oop

經過\t製表符進行分割。學習

三、分割以後如何獲取數據?測試

假設分割以後總長度爲n,則:1列(從0計數)爲電話號碼,n-3列爲上行流量,n-2列爲下行流量。

一、簡單實現

1.一、自定義Bean類

package com.zhaoyi.phoneflow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {

    private long upFlow;// 上行流量
    private long downFlow;// 下行流量
    private long totalFlow;// 總流量

    // 無參構造
    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }

    // 序列化
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(totalFlow);
    }

    // set方法,一次性設置屬性
    public void set(long upFlow, long downFlow){
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }

    // 反序列化 - 順序和序列化保持一致
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.totalFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public long getTotalFlow() {
        return totalFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public void setTotalFlow(long totalFlow) {
        this.totalFlow = totalFlow;
    }

    // 使用製表符分隔
    @Override
    public String toString() {
        return "upFlow=" + upFlow +
                "\t" + downFlow +
                "\t" + totalFlow;
    }
}

1.二、Mapper類

package com.zhaoyi.phoneflow;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    private final int PHONE_NUMBER_INDEX = 1;
    private final int UP_FLOW_BACKWARDS = 3;
    private final int DOWN_FLOW_BACKWARDS = 2;
    private Text outKey = new Text();
    private FlowBean outVal = new FlowBean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 輸入的key是id號,不須要處理
        // 1. 獲取一行數據
        String line = value.toString();

        // 2.截取數據
        String[] strings = line.split("\t");

        // 3.獲取key - 電話號碼
        String phoneNumber = strings[PHONE_NUMBER_INDEX];
        outKey.set(phoneNumber);

        // 4.獲取輸出val - flowBean
        // (1)獲取上行流量
        long upFlow = Long.parseLong(strings[strings.length - UP_FLOW_BACKWARDS]);
        // (2)獲取下行流量
        long downFlow = Long.parseLong(strings[strings.length - DOWN_FLOW_BACKWARDS]);
        outVal.set(upFlow, downFlow);

        // 2.寫數據
        context.write(outKey, outVal);
    }
}

1.三、Reducer類

package com.zhaoyi.phoneflow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outVal = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long upFlow = 0;
        long downFlow = 0;
        // reduce
        for (FlowBean value : values) {
            upFlow += value.getUpFlow();
            downFlow += value.getDownFlow();
        }
        // set output val.
        outVal.set(upFlow, downFlow);

        //  寫出數據
        context.write(key, outVal);
    }
}

1.五、驅動類

該類負責加載Mapper、reducer執行任務。

package com.zhaoyi.phoneflow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {
    public static void main(String[] args) throws Exception {
        // 0.檢測參數
        if(args.length != 2){
            System.out.println("Please enter the parameter: data input and output paths...");
            System.exit(-1);
        }
        // 獲取job
        Job job = Job.getInstance(new Configuration());

        // 設置jar路徑
        job.setJarByClass(FlowDriver.class);

        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 1:0);
    }
}

1.七、測試

這一次咱們從本地運行測試。在運行以前,確保你安裝了hadoop的包到windows目錄,並設置了環境變量,參考前面的章節。

一、點擊主程序Driver類,添加運行參數

本地輸入參數配置

二、將咱們的phone_number.txt文件放到第一個參數指定的目錄中。

第二次參數指定結果輸出目錄,注意,若是該目錄存在,必須手動去刪除,否則hadoop運行異常,前面咱們已經強調過這個問題,並解釋了緣由。

三、直接運行。咱們就能夠獲得輸出結果了,輸出結果位於第二個參數指定的目錄中,咱們直接查看結果集

D:\hadoop\output>type part-r-00000
13480253104     180     180     360
13502468823     7335    110349  117684
13560436666     1116    954     2070
13560439658     2034    5892    7926
13602846565     1938    2910    4848
13660577991     6960    690     7650
13719199419     240     0       240
13726230503     2481    24681   27162
13726238888     2481    24681   27162
13760778710     120     120     240
13826544101     264     0       264
13922314466     3008    3720    6728
13925057413     11058   48243   59301
13926251106     240     0       240
13926435656     132     1512    1644
15013685858     3659    3538    7197
15920133257     3156    2936    6092
15989002119     1938    180     2118
18211575961     1527    2106    3633
18320173382     9531    2412    11943
84138413        4116    1432    5548

本案例的代碼參見github項目phoneflow模塊。

相關文章
相關標籤/搜索