先如圖創建一個包,四個類。java
FlowBeanapache
package cn.itcast.hadoop.mr.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNB; //手機號碼 private long up_flow; //上行流量 private long d_flow; //下行流量 private long s_flow; //總流量 //反序列化時,反射機制須要調用空參構造函數,因此顯示定義了一個空參構造函數 public FlowBean(){} //爲了對象數據的初始化方便,加入一個帶參的構造函數 public FlowBean(String phoneNB, long up_flow, long d_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.d_flow = d_flow; this.s_flow = up_flow + d_flow; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getD_flow() { return d_flow; } public void setD_flow(long d_flow) { this.d_flow = d_flow; } public long getS_flow() { return s_flow; } public void setS_flow(long s_flow) { this.s_flow = s_flow; } //將對象數據序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(d_flow); out.writeLong(s_flow); } //從數據流中反序列化出對象的數據 //從數據流中讀出對象字段時,必須跟序列化時的順序保持一致 @Override public void readFields(DataInput in) throws IOException { phoneNB = in.readUTF(); up_flow = in.readLong(); d_flow = in.readLong(); s_flow = in.readLong(); } @Override public String toString() { return "" + up_flow + "\t" +d_flow + "\t" + s_flow; } @Override public int compareTo(FlowBean o) { return s_flow>o.getS_flow()?-1:1; } }
FlowSumMapperapp
package cn.itcast.hadoop.mr.flowsum; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * FlowBean是咱們自定義的一種數據類型,要在 hadoop 中各個節點之間傳輸,應該遵循 hadoop 的序列化機制 * 就必須實現 hadoop 相應的序列化接口 * @author duanhaitao@itcast.cn * */ public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ //拿到日誌中的一行數據,切分各個字段,抽取出咱們須要的字段,手機號,上行流量,下行流量,而後封裝成 kv 發送出去 @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿一行數據 String line = value.toString(); //切分紅各個字段 String[] fields = StringUtils.split(line, "\t"); //拿到咱們須要的字段 String phoneNB = fields[1]; long u_flow = Long.parseLong(fields[7]); long d_flow = Long.parseLong(fields[8]); //封裝數據爲 kv 並輸出 context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow)); } }
FlowSumReducer框架
package cn.itcast.hadoop.mr.flowsum; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ //框架每使用一組數據<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>調用一次咱們的reduce方法 //reduce中業務邏輯就是遍歷 values,而後進行裂甲求和再輸出 @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { long up_flow_counter = 0; long d_flow_counter = 0; for(FlowBean bean : values){ up_flow_counter += bean.getUp_flow(); d_flow_counter += bean.getD_flow(); } context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); } }
FlowSumRunneride
package cn.itcast.hadoop.mr.flowsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //job 描述和提交的規範寫法 public class FlowSumRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumRunner.class); job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); System.exit(res); } }
而後打包一個 jar 包函數
在終端,啓動 hdfs , yarn 。oop
put c:/flow.jar
數據放進去this
執行3d
結果日誌