文章出處:http://blog.csdn.net/sdksdk0/article/details/51628874html
做者:朱培java
---------------------------------------------------------------------------------------------------------------git
本文是結合Hadoop中的mapreduce來對用戶數據進行分析,統計用戶的手機號碼、上行流量、下行流量、總流量的信息,同時能夠按照總流量大小對用戶進行分組排序等。是一個很是簡潔易用的hadoop項目,主要用戶進一步增強對MapReduce的理解及實際應用。文末提供源數據採集文件和系統源碼。github
本案例很是適合hadoop初級人員學習以及想入門大數據、雲計算、數據分析等領域的朋友進行學習。apache
1、待分析的數據源
如下是一個待分析的文本文件,裏面有很是多的用戶瀏覽信息,保擴用戶手機號碼,上網時間,機器序列號,訪問的IP,訪問的網站,上行流量,下行流量,總流量等信息。這裏只截取一小段,具體文件在文末提供下載連接。app
![](http://static.javashuo.com/static/loading.gif)
2、基本功能實現
想要統計出用戶的上行流量、下行流量、總流量信息,咱們須要創建一個bean類來對數據進行封裝。因而新建應該Java工程,導包,或者直接創建一個MapReduce工程。在這裏面創建一個FlowBean.java文件。
- private long upFlow;
- private long dFlow;
- private long sumFlow;
而後就是各類右鍵生成get,set方法,還要toString(),以及生成構造函數,(千萬記得要生成一個空的構造函數,否則後面進行分析的時候會報錯)。
完整代碼以下:
- package cn.tf.flow;
-
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
-
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.io.WritableComparable;
-
- public class FlowBean implements WritableComparable<FlowBean>{
-
- private long upFlow;
- private long dFlow;
- private long sumFlow;
- public long getUpFlow() {
- return upFlow;
- }
- public void setUpFlow(long upFlow) {
- this.upFlow = upFlow;
- }
- public long getdFlow() {
- return dFlow;
- }
- public void setdFlow(long dFlow) {
- this.dFlow = dFlow;
- }
- public long getSumFlow() {
- return sumFlow;
- }
- public void setSumFlow(long sumFlow) {
- this.sumFlow = sumFlow;
- }
- public FlowBean(long upFlow, long dFlow) {
- super();
- this.upFlow = upFlow;
- this.dFlow = dFlow;
- this.sumFlow = upFlow+dFlow;
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- upFlow=in.readLong();
- dFlow=in.readLong();
- sumFlow=in.readLong();
-
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(upFlow);
- out.writeLong(dFlow);
- out.writeLong(sumFlow);
- }
- public FlowBean() {
- super();
- }
-
- @Override
- public String toString() {
-
- return upFlow + "\t" + dFlow + "\t" + sumFlow;
- }
- @Override
- public int compareTo(FlowBean o) {
-
- return this.sumFlow>o.getSumFlow() ? -1:1;
- }
-
-
-
- }
而後就是這個統計的代碼了,新建一個FlowCount.java.在這個類裏面,我直接把Mapper和Reduce寫在同一個類裏面了,若是按規範的要求應該是要分開寫的。
在mapper中,獲取後面三段數據的值,因此個人這裏length-2,length-3.
- public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
-
- String line = value.toString();
-
- String[] fields = StringUtils.split(line, "\t");
- try {
- if (fields.length > 3) {
-
- String phone = fields[1];
- long upFlow = Long.parseLong(fields[fields.length - 3]);
- long dFlow = Long.parseLong(fields[fields.length - 2]);
-
-
- context.write(new Text(phone), new FlowBean(upFlow, dFlow));
- } else {
- return;
- }
- } catch (Exception e) {
-
- }
-
- }
-
- }
在reduce中隊數據進行整理,統計
- public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
-
- @Override
- protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
-
- long upSum = 0;
- long dSum = 0;
-
- for (FlowBean bean : values) {
-
- upSum += bean.getUpFlow();
- dSum += bean.getdFlow();
- }
-
- FlowBean resultBean = new FlowBean(upSum, dSum);
- context.write(key, resultBean);
-
- }
-
- }
最後在main方法中調用執行。
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- job.setJarByClass(FlowCount.class);
-
- job.setMapperClass(FlowCountMapper.class);
- job.setReducerClass(FlowCountReducer.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(FlowBean.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
-
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- boolean res = job.waitForCompletion(true);
- System.exit(res ? 0 : 1);
-
- }
固然啦,還須要先在你的hdfs根目錄中創建/flow/data數據,而後我那個用戶的數據源上傳上去。
- bin/hadoop fs -mkdir -p /flow/data
- bin/hadoop fs -put HTTP_20130313143750.dat /flow/data
- bin/hadoop jar ../lx/flow.jar
把上面這個MapReduce工程打包成一個jar文件,而後用hadoop來執行這個jar文件。例如我放在~/hadoop/lx/flow.jar,而後再hadoop安裝目錄中執行
- bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount /flow/data /flow/output
最後執行結果以下:
在這整過過程當中,咱們是有yarnchild的進程在執行的,以下圖所示:當整個過程執行完畢以後yarnchild也會自動退出。
3、按總流量從大到小排序
若是你上面這個基本操做以及完成了的話,按總流量排序就很是簡單了。咱們新建一個FlowCountSort.Java.ide
所有代碼以下:函數
- package cn.tf.flow;
-
- import java.io.IOException;
-
- import org.apache.commons.lang.StringUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- public class FlowCountSort {
-
- public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
-
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
- String line=value.toString();
- String[] fields=StringUtils.split(line,"\t");
-
- String phone=fields[0];
- long upSum=Long.parseLong(fields[1]);
- long dSum=Long.parseLong(fields[2]);
-
- FlowBean sumBean=new FlowBean(upSum,dSum);
-
- context.write(sumBean, new Text(phone));
-
- }
- }
-
- public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
-
-
- @Override
- protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
- context.write(values.iterator().next(), key);
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
-
- job.setJarByClass(FlowCountSort.class);
-
- job.setMapperClass(FlowCountSortMapper.class);
- job.setReducerClass(FlowCountSortReducer.class);
-
- job.setMapOutputKeyClass(FlowBean.class);
- job.setMapOutputValueClass(Text.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
-
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- boolean res = job.waitForCompletion(true);
- System.exit(res ? 0 : 1);
-
- }
-
- }
這個主要就是使用了FlowBean.java中的代碼來實現的,主要是繼承了WritableComparable<FlowBean>接口來實現,而後重寫了compareTo()方法。oop
- @Override
- public int compareTo(FlowBean o) {
-
- return this.sumFlow>o.getSumFlow() ? -1:1;
- }
-
按照一樣的方法對這個文件打成jar包,而後使用hadoop的相關語句進行執行就能夠了。學習
- bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort /flow/output /flow/sortoutput
結果圖:
![](http://static.javashuo.com/static/loading.gif)
4、按用戶號碼區域進行分類
流量彙總以後的結果須要按照省份輸出到不一樣的結果文件中,須要解決兩個問題:
一、如何讓mr的最終結果產生多個文件: 原理:MR中的結果文件數量由reduce
task的數量絕對,是一一對應的 作法:在代碼中指定reduce task的數量
二、如何讓手機號進入正確的文件 原理:讓不一樣手機號數據發給正確的reduce task,就進入了正確的結果文件
要自定義MR中的分區partition的機制(默認的機制是按照kv中k的hashcode%reducetask數)
作法:自定義一個類來干預MR的分區策略——Partitioner的自定義實現類
主要代碼與前面的排序是很是相似的,只要在main方法中添加以下兩行代碼就能夠了。
-
- job.setPartitionerClass(ProvincePartioner.class);
-
- job.setNumReduceTasks(5);
這裏咱們須要新建一個ProvincePartioner.java來處理號碼分類的邏輯。
- public class ProvincePartioner extends Partitioner<Text, FlowBean>{
-
-
- private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
-
- static {
-
- provinceMap.put("135", 0);
- provinceMap.put("136", 1);
- provinceMap.put("137", 2);
- provinceMap.put("138", 3);
- }
-
- @Override
- public int getPartition(Text key, FlowBean value, int numPartitions) {
-
- String prefix = key.toString().substring(0, 3);
- Integer partNum = provinceMap.get(prefix);
- if(partNum == null) partNum=4;
-
- return partNum;
- }
-
- }
執行方法和前面也是同樣的。從執行的流程中咱們能夠看到這裏啓動了5個reduce task,由於我這裏數據量比較小,因此只啓動了一個map task。
![](http://static.javashuo.com/static/loading.gif)
到這裏,整個用戶流量分析系統就所有結束了。關於大數據的更多內容,歡迎關注。點擊左上角頭像下方「點擊關注".感謝您的支持!
數據源下載地址:http://download.csdn.net/detail/sdksdk0/9545935
源碼項目地址:https://github.com/sdksdk0/HDFS_MapReduce