從本章節您能夠學習到:手機流量統計案例。java
接下來咱們有一系列的分析文件,以下構造所示:git
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 ...
需求:統計手機號耗費的總上行流量、下行流量、總流量(序列化)github
這裏沒有詳細的告訴咱們具體一列是什麼,只能憑直覺去定義了。但這並不影響咱們的操做。同時須要瞭解到的是,現實生活中的數據甚至比這個亂的多,因此,數據挖掘又是咱們鑽研大數據不得不學習的另外一個領域。apache
很顯然,咱們須要的是電話號碼(列2)、上行流量、下行流量數據以及將上下行流量彙總的總流量數據。windows
一、咱們須要什麼?app
這時候用於序列化傳輸的基本類型就不能知足咱們的數據傳輸需求了,所以,咱們須要自定義bean序列化類型。ide
二、數據如何分割?oop
經過\t製表符進行分割。學習
三、分割以後如何獲取數據?測試
假設分割以後總長度爲n,則:1列(從0計數)爲電話號碼,n-3列爲上行流量,n-2列爲下行流量。
package com.zhaoyi.phoneflow; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable { private long upFlow;// 上行流量 private long downFlow;// 下行流量 private long totalFlow;// 總流量 // 無參構造 public FlowBean() { } public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.totalFlow = upFlow + downFlow; } // 序列化 public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(totalFlow); } // set方法,一次性設置屬性 public void set(long upFlow, long downFlow){ this.upFlow = upFlow; this.downFlow = downFlow; this.totalFlow = upFlow + downFlow; } // 反序列化 - 順序和序列化保持一致 public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.totalFlow = in.readLong(); } public long getUpFlow() { return upFlow; } public long getDownFlow() { return downFlow; } public long getTotalFlow() { return totalFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public void setTotalFlow(long totalFlow) { this.totalFlow = totalFlow; } // 使用製表符分隔 @Override public String toString() { return "upFlow=" + upFlow + "\t" + downFlow + "\t" + totalFlow; } }
package com.zhaoyi.phoneflow; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private final int PHONE_NUMBER_INDEX = 1; private final int UP_FLOW_BACKWARDS = 3; private final int DOWN_FLOW_BACKWARDS = 2; private Text outKey = new Text(); private FlowBean outVal = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 輸入的key是id號,不須要處理 // 1. 獲取一行數據 String line = value.toString(); // 2.截取數據 String[] strings = line.split("\t"); // 3.獲取key - 電話號碼 String phoneNumber = strings[PHONE_NUMBER_INDEX]; outKey.set(phoneNumber); // 4.獲取輸出val - flowBean // (1)獲取上行流量 long upFlow = Long.parseLong(strings[strings.length - UP_FLOW_BACKWARDS]); // (2)獲取下行流量 long downFlow = Long.parseLong(strings[strings.length - DOWN_FLOW_BACKWARDS]); outVal.set(upFlow, downFlow); // 2.寫數據 context.write(outKey, outVal); } }
package com.zhaoyi.phoneflow; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { private FlowBean outVal = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlow = 0; long downFlow = 0; // reduce for (FlowBean value : values) { upFlow += value.getUpFlow(); downFlow += value.getDownFlow(); } // set output val. outVal.set(upFlow, downFlow); // 寫出數據 context.write(key, outVal); } }
該類負責加載Mapper、reducer執行任務。
package com.zhaoyi.phoneflow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowDriver { public static void main(String[] args) throws Exception { // 0.檢測參數 if(args.length != 2){ System.out.println("Please enter the parameter: data input and output paths..."); System.exit(-1); } // 獲取job Job job = Job.getInstance(new Configuration()); // 設置jar路徑 job.setJarByClass(FlowDriver.class); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true)? 1:0); } }
這一次咱們從本地運行測試。在運行以前,確保你安裝了hadoop的包到windows目錄,並設置了環境變量,參考前面的章節。
一、點擊主程序Driver類,添加運行參數
二、將咱們的phone_number.txt文件放到第一個參數指定的目錄中。
第二次參數指定結果輸出目錄,注意,若是該目錄存在,必須手動去刪除,否則hadoop運行異常,前面咱們已經強調過這個問題,並解釋了緣由。
三、直接運行。咱們就能夠獲得輸出結果了,輸出結果位於第二個參數指定的目錄中,咱們直接查看結果集
D:\hadoop\output>type part-r-00000 13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954 2070 13560439658 2034 5892 7926 13602846565 1938 2910 4848 13660577991 6960 690 7650 13719199419 240 0 240 13726230503 2481 24681 27162 13726238888 2481 24681 27162 13760778710 120 120 240 13826544101 264 0 264 13922314466 3008 3720 6728 13925057413 11058 48243 59301 13926251106 240 0 240 13926435656 132 1512 1644 15013685858 3659 3538 7197 15920133257 3156 2936 6092 15989002119 1938 180 2118 18211575961 1527 2106 3633 18320173382 9531 2412 11943 84138413 4116 1432 5548
本案例的代碼參見github項目phoneflow模塊。