大數據實戰:用戶流量分析系統

文章出處:http://blog.csdn.net/sdksdk0/article/details/51628874html

做者:朱培java

---------------------------------------------------------------------------------------------------------------git

 

本文是結合Hadoop中的mapreduce來對用戶數據進行分析,統計用戶的手機號碼、上行流量、下行流量、總流量的信息,同時能夠按照總流量大小對用戶進行分組排序等。是一個很是簡潔易用的hadoop項目,主要用戶進一步增強對MapReduce的理解及實際應用。文末提供源數據採集文件和系統源碼。github

本案例很是適合hadoop初級人員學習以及想入門大數據雲計算、數據分析等領域的朋友進行學習。apache

1、待分析的數據源

如下是一個待分析的文本文件,裏面有很是多的用戶瀏覽信息,保擴用戶手機號碼,上網時間,機器序列號,訪問的IP,訪問的網站,上行流量,下行流量,總流量等信息。這裏只截取一小段,具體文件在文末提供下載連接。app


2、基本功能實現

想要統計出用戶的上行流量、下行流量、總流量信息,咱們須要創建一個bean類來對數據進行封裝。因而新建應該Java工程,導包,或者直接創建一個MapReduce工程。在這裏面創建一個FlowBean.java文件。
 
[html]  view plain  copy
 
 print?
  1.        private long upFlow;  
  2. private long dFlow;  
  3. private long sumFlow;  
而後就是各類右鍵生成get,set方法,還要toString(),以及生成構造函數,(千萬記得要生成一個空的構造函數,否則後面進行分析的時候會報錯)。
完整代碼以下:
[java]  view plain  copy
 
 print?
  1. package cn.tf.flow;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.IOException;  
  6.   
  7. import org.apache.hadoop.io.Writable;  
  8. import org.apache.hadoop.io.WritableComparable;  
  9.   
  10. public class FlowBean  implements WritableComparable<FlowBean>{  
  11.       
  12.     private long upFlow;  
  13.     private long dFlow;  
  14.     private long sumFlow;  
  15.     public long getUpFlow() {  
  16.         return upFlow;  
  17.     }  
  18.     public void setUpFlow(long upFlow) {  
  19.         this.upFlow = upFlow;  
  20.     }  
  21.     public long getdFlow() {  
  22.         return dFlow;  
  23.     }  
  24.     public void setdFlow(long dFlow) {  
  25.         this.dFlow = dFlow;  
  26.     }  
  27.     public long getSumFlow() {  
  28.         return sumFlow;  
  29.     }  
  30.     public void setSumFlow(long sumFlow) {  
  31.         this.sumFlow = sumFlow;  
  32.     }  
  33.     public FlowBean(long upFlow, long dFlow) {  
  34.         super();  
  35.         this.upFlow = upFlow;  
  36.         this.dFlow = dFlow;  
  37.         this.sumFlow = upFlow+dFlow;  
  38.     }  
  39.     @Override  
  40.     public void readFields(DataInput in) throws IOException {  
  41.         upFlow=in.readLong();  
  42.         dFlow=in.readLong();  
  43.         sumFlow=in.readLong();  
  44.           
  45.     }  
  46.     @Override  
  47.     public void write(DataOutput out) throws IOException {  
  48.         out.writeLong(upFlow);  
  49.         out.writeLong(dFlow);  
  50.         out.writeLong(sumFlow);  
  51.     }  
  52.     public FlowBean() {  
  53.         super();  
  54.     }  
  55.   
  56.     @Override  
  57.     public String toString() {  
  58.            
  59.         return  upFlow + "\t" + dFlow + "\t" + sumFlow;  
  60.     }  
  61.     @Override  
  62.     public int compareTo(FlowBean o) {  
  63.           
  64.         return this.sumFlow>o.getSumFlow() ? -1:1;  
  65.     }  
  66.       
  67.       
  68.   
  69. }  

而後就是這個統計的代碼了,新建一個FlowCount.java.在這個類裏面,我直接把Mapper和Reduce寫在同一個類裏面了,若是按規範的要求應該是要分開寫的。
在mapper中,獲取後面三段數據的值,因此個人這裏length-2,length-3.
[java]  view plain  copy
 
 print?
  1.       public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {  
  2.     @Override  
  3.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  4.   
  5.         // 拿到這行的內容轉成string  
  6.         String line = value.toString();  
  7.   
  8.         String[] fields = StringUtils.split(line, "\t");  
  9.         try {  
  10.             if (fields.length > 3) {  
  11.                 // 得到手機號及上下行流量字段值  
  12.                 String phone = fields[1];  
  13.                 long upFlow = Long.parseLong(fields[fields.length - 3]);  
  14.                 long dFlow = Long.parseLong(fields[fields.length - 2]);  
  15.   
  16.                 // 輸出這一行的處理結果,key爲手機號,value爲流量信息bean  
  17.                 context.write(new Text(phone), new FlowBean(upFlow, dFlow));  
  18.             } else {  
  19.                 return;  
  20.             }  
  21.         } catch (Exception e) {  
  22.   
  23.         }  
  24.   
  25.     }  
  26.   
  27. }  

 
 
 
在reduce中隊數據進行整理,統計
 
[java]  view plain  copy
 
 print?
  1. public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {  
  2.   
  3.         @Override  
  4.         protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {  
  5.   
  6.             long upSum = 0;  
  7.             long dSum = 0;  
  8.   
  9.             for (FlowBean bean : values) {  
  10.   
  11.                 upSum += bean.getUpFlow();  
  12.                 dSum += bean.getdFlow();  
  13.             }  
  14.   
  15.             FlowBean resultBean = new FlowBean(upSum, dSum);  
  16.             context.write(key, resultBean);  
  17.   
  18.         }  
  19.   
  20.     }  


最後在main方法中調用執行。
[java]  view plain  copy
 
 print?
  1. public static void main(String[] args) throws Exception {  
  2.   
  3.         Configuration conf = new Configuration();  
  4.         Job job = Job.getInstance(conf);  
  5.   
  6.         job.setJarByClass(FlowCount.class);  
  7.   
  8.         job.setMapperClass(FlowCountMapper.class);  
  9.         job.setReducerClass(FlowCountReducer.class);  
  10.   
  11.         job.setMapOutputKeyClass(Text.class);  
  12.         job.setMapOutputValueClass(FlowBean.class);  
  13.   
  14.         job.setOutputKeyClass(Text.class);  
  15.         job.setOutputValueClass(FlowBean.class);  
  16.   
  17.         FileInputFormat.setInputPaths(job, new Path(args[0]));  
  18.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  19.   
  20.         boolean res = job.waitForCompletion(true);  
  21.         System.exit(res ? 0 : 1);  
  22.   
  23.     }  
固然啦,還須要先在你的hdfs根目錄中創建/flow/data數據,而後我那個用戶的數據源上傳上去。
[java]  view plain  copy
 
 print?
  1. bin/hadoop fs -mkdir -p /flow/data  
  2. bin/hadoop fs -put HTTP_20130313143750.dat /flow/data  
  3. bin/hadoop jar  ../lx/flow.jar  
 
把上面這個MapReduce工程打包成一個jar文件,而後用hadoop來執行這個jar文件。例如我放在~/hadoop/lx/flow.jar,而後再hadoop安裝目錄中執行
[java]  view plain  copy
 
 print?
  1. bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount  /flow/data  /flow/output  
 
最後執行結果以下:



在這整過過程當中,咱們是有yarnchild的進程在執行的,以下圖所示:當整個過程執行完畢以後yarnchild也會自動退出。

3、按總流量從大到小排序

若是你上面這個基本操做以及完成了的話,按總流量排序就很是簡單了。咱們新建一個FlowCountSort.Java.ide

所有代碼以下:函數

 

[java]  view plain  copy
 
 print?
  1. package cn.tf.flow;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.commons.lang.StringUtils;  
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.LongWritable;  
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.mapreduce.Job;  
  11. import org.apache.hadoop.mapreduce.Mapper;  
  12. import org.apache.hadoop.mapreduce.Reducer;  
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  15.   
  16. public class FlowCountSort {  
  17.   
  18. public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{  
  19.           
  20.         @Override  
  21.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  22.               
  23.             String line=value.toString();  
  24.             String[] fields=StringUtils.split(line,"\t");  
  25.               
  26.             String phone=fields[0];  
  27.             long upSum=Long.parseLong(fields[1]);  
  28.             long dSum=Long.parseLong(fields[2]);  
  29.               
  30.             FlowBean sumBean=new FlowBean(upSum,dSum);  
  31.               
  32.             context.write(sumBean, new Text(phone));  
  33.           
  34.         }     
  35. }  
  36.   
  37.     public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{  
  38.           
  39.         //進來的「一組」數據就是一個手機的流量bean和手機號  
  40.         @Override  
  41.         protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  42.       
  43.             context.write(values.iterator().next(), key);  
  44.         }  
  45.     }  
  46.   
  47.     public static void main(String[] args) throws Exception {  
  48.   
  49.         Configuration conf = new Configuration();  
  50.         Job job = Job.getInstance(conf);  
  51.   
  52.         job.setJarByClass(FlowCountSort.class);  
  53.   
  54.         job.setMapperClass(FlowCountSortMapper.class);  
  55.         job.setReducerClass(FlowCountSortReducer.class);  
  56.   
  57.         job.setMapOutputKeyClass(FlowBean.class);  
  58.         job.setMapOutputValueClass(Text.class);  
  59.   
  60.         job.setOutputKeyClass(Text.class);  
  61.         job.setOutputValueClass(FlowBean.class);  
  62.   
  63.         FileInputFormat.setInputPaths(job, new Path(args[0]));  
  64.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  65.   
  66.         boolean res = job.waitForCompletion(true);  
  67.         System.exit(res ? 0 : 1);  
  68.   
  69.     }  
  70.       
  71. }  


這個主要就是使用了FlowBean.java中的代碼來實現的,主要是繼承了WritableComparable<FlowBean>接口來實現,而後重寫了compareTo()方法。oop

 

 

[html]  view plain  copy
 
 print?
  1. @Override  
  2.     public int compareTo(FlowBean o) {  
  3.           
  4.         return this.sumFlow>o.getSumFlow() ? -1:1;  
  5.     }  
  6.       

按照一樣的方法對這個文件打成jar包,而後使用hadoop的相關語句進行執行就能夠了。學習

 

 

[java]  view plain  copy
 
 print?
  1. bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort  /flow/output  /flow/sortoutput  

結果圖:

 




4、按用戶號碼區域進行分類

流量彙總以後的結果須要按照省份輸出到不一樣的結果文件中,須要解決兩個問題:

 一、如何讓mr的最終結果產生多個文件: 原理:MR中的結果文件數量由reduce
  task的數量絕對,是一一對應的 作法:在代碼中指定reduce task的數量
 
 
  二、如何讓手機號進入正確的文件 原理:讓不一樣手機號數據發給正確的reduce task,就進入了正確的結果文件
  要自定義MR中的分區partition的機制(默認的機制是按照kv中k的hashcode%reducetask數)
  作法:自定義一個類來干預MR的分區策略——Partitioner的自定義實現類

主要代碼與前面的排序是很是相似的,只要在main方法中添加以下兩行代碼就能夠了。

 

[java]  view plain  copy
 
 print?
  1.         //指定自定義的partitioner  
  2. job.setPartitionerClass(ProvincePartioner.class);  
  3.   
  4. job.setNumReduceTasks(5);  


這裏咱們須要新建一個ProvincePartioner.java來處理號碼分類的邏輯。

 

 

[java]  view plain  copy
 
 print?
  1. public class ProvincePartioner extends Partitioner<Text, FlowBean>{  
  2.       
  3.       
  4. private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();  
  5.       
  6.     static {  
  7.           
  8.         provinceMap.put("135", 0);  
  9.         provinceMap.put("136", 1);  
  10.         provinceMap.put("137", 2);  
  11.         provinceMap.put("138", 3);        
  12.     }  
  13.       
  14.     @Override  
  15.     public int getPartition(Text key, FlowBean value, int numPartitions) {  
  16.   
  17.         String prefix = key.toString().substring(0, 3);  
  18.         Integer partNum = provinceMap.get(prefix);  
  19.         if(partNum == null) partNum=4;  
  20.           
  21.         return partNum;  
  22.     }  
  23.   
  24. }  


執行方法和前面也是同樣的。從執行的流程中咱們能夠看到這裏啓動了5個reduce task,由於我這裏數據量比較小,因此只啓動了一個map task。

 

 

到這裏,整個用戶流量分析系統就所有結束了。關於大數據的更多內容,歡迎關注。點擊左上角頭像下方「點擊關注".感謝您的支持!

 

 

數據源下載地址:http://download.csdn.net/detail/sdksdk0/9545935

源碼項目地址:https://github.com/sdksdk0/HDFS_MapReduce

相關文章
相關標籤/搜索