---------------------------------------------------------------------------------------------------------------html
[版權申明:本文系做者原創,轉載請註明出處]java
文章出處:http://blog.csdn.net/sdksdk0/article/details/51628874git
做者:朱培github
---------------------------------------------------------------------------------------------------------------apache
本文是結合hadoop中的mapreduce來對用戶數據進行分析,統計用戶的手機號碼、上行流量、下行流量、總流量的信息,同時能夠按照總流量大小對用戶進行分組排序等。是一個很是簡潔易用的hadoop項目,主要用戶進一步增強對MapReduce的理解及實際應用。文末提供源數據採集文件和系統源碼。app
本案例很是適合hadoop初級人員學習以及想入門大數據、雲計算、數據分析等領域的朋友進行學習。ide
如下是一個待分析的文本文件,裏面有很是多的用戶瀏覽信息,保擴用戶手機號碼,上網時間,機器序列號,訪問的IP,訪問的網站,上行流量,下行流量,總流量等信息。這裏只截取一小段,具體文件在文末提供下載連接。函數
private long upFlow; private long dFlow; private long sumFlow;而後就是各類右鍵生成get,set方法,還要toString(),以及生成構造函數,(千萬記得要生成一個空的構造函數,否則後面進行分析的時候會報錯)。
package cn.tf.flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ private long upFlow; private long dFlow; private long sumFlow; public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getdFlow() { return dFlow; } public void setdFlow(long dFlow) { this.dFlow = dFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public FlowBean(long upFlow, long dFlow) { super(); this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow+dFlow; } @Override public void readFields(DataInput in) throws IOException { upFlow=in.readLong(); dFlow=in.readLong(); sumFlow=in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); } public FlowBean() { super(); } @Override public String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { return this.sumFlow>o.getSumFlow() ? -1:1; } }
public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 拿到這行的內容轉成string String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { if (fields.length > 3) { // 得到手機號及上下行流量字段值 String phone = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dFlow = Long.parseLong(fields[fields.length - 2]); // 輸出這一行的處理結果,key爲手機號,value爲流量信息bean context.write(new Text(phone), new FlowBean(upFlow, dFlow)); } else { return; } } catch (Exception e) { } } }
public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upSum = 0; long dSum = 0; for (FlowBean bean : values) { upSum += bean.getUpFlow(); dSum += bean.getdFlow(); } FlowBean resultBean = new FlowBean(upSum, dSum); context.write(key, resultBean); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowCount.class); job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); }
bin/hadoop fs -mkdir -p /flow/data bin/hadoop fs -put HTTP_20130313143750.dat /flow/data bin/hadoop jar ../lx/flow.jar
bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount /flow/data /flow/output
若是你上面這個基本操做以及完成了的話,按總流量排序就很是簡單了。咱們新建一個FlowCountSort.java.oop
所有代碼以下:學習
package cn.tf.flow; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSort { public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String[] fields=StringUtils.split(line,"\t"); String phone=fields[0]; long upSum=Long.parseLong(fields[1]); long dSum=Long.parseLong(fields[2]); FlowBean sumBean=new FlowBean(upSum,dSum); context.write(sumBean, new Text(phone)); } } public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ //進來的「一組」數據就是一個手機的流量bean和手機號 @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), key); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowCountSort.class); job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
@Override public int compareTo(FlowBean o) { return this.sumFlow>o.getSumFlow() ? -1:1; }按照一樣的方法對這個文件打成jar包,而後使用hadoop的相關語句進行執行就能夠了。
bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort /flow/output /flow/sortoutput結果圖:
流量彙總以後的結果須要按照省份輸出到不一樣的結果文件中,須要解決兩個問題:
一、如何讓mr的最終結果產生多個文件: 原理:MR中的結果文件數量由reduce
task的數量絕對,是一一對應的 作法:在代碼中指定reduce task的數量
二、如何讓手機號進入正確的文件 原理:讓不一樣手機號數據發給正確的reduce task,就進入了正確的結果文件
要自定義MR中的分區partition的機制(默認的機制是按照kv中k的hashcode%reducetask數)
作法:自定義一個類來干預MR的分區策略——Partitioner的自定義實現類
主要代碼與前面的排序是很是相似的,只要在main方法中添加以下兩行代碼就能夠了。
//指定自定義的partitioner job.setPartitionerClass(ProvincePartioner.class); job.setNumReduceTasks(5);
public class ProvincePartioner extends Partitioner<Text, FlowBean>{ private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static { provinceMap.put("135", 0); provinceMap.put("136", 1); provinceMap.put("137", 2); provinceMap.put("138", 3); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { String prefix = key.toString().substring(0, 3); Integer partNum = provinceMap.get(prefix); if(partNum == null) partNum=4; return partNum; } }
到這裏,整個用戶流量分析系統就所有結束了。關於大數據的更多內容,歡迎關注。點擊左上角頭像下方「點擊關注".感謝您的支持!