流量求和 mr 程序開發 demo

先如圖創建一個包,四個類。java

FlowBeanapache

package cn.itcast.hadoop.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{
	
	
	private String phoneNB; //手機號碼
	private long up_flow;   //上行流量
	private long d_flow;    //下行流量
	private long s_flow;	//總流量
	
	//反序列化時,反射機制須要調用空參構造函數,因此顯示定義了一個空參構造函數
	public FlowBean(){}
	
	//爲了對象數據的初始化方便,加入一個帶參的構造函數
	public FlowBean(String phoneNB, long up_flow, long d_flow) {
		this.phoneNB = phoneNB;
		this.up_flow = up_flow;
		this.d_flow = d_flow;
		this.s_flow = up_flow + d_flow;
	}

	public String getPhoneNB() {
		return phoneNB;
	}

	public void setPhoneNB(String phoneNB) {
		this.phoneNB = phoneNB;
	}

	public long getUp_flow() {
		return up_flow;
	}

	public void setUp_flow(long up_flow) {
		this.up_flow = up_flow;
	}

	public long getD_flow() {
		return d_flow;
	}

	public void setD_flow(long d_flow) {
		this.d_flow = d_flow;
	}

	public long getS_flow() {
		return s_flow;
	}

	public void setS_flow(long s_flow) {
		this.s_flow = s_flow;
	}

	
	
	//將對象數據序列化到流中
	@Override
	public void write(DataOutput out) throws IOException {

		out.writeUTF(phoneNB);
		out.writeLong(up_flow);
		out.writeLong(d_flow);
		out.writeLong(s_flow);
		
	}

	
	//從數據流中反序列化出對象的數據
	//從數據流中讀出對象字段時,必須跟序列化時的順序保持一致
	@Override
	public void readFields(DataInput in) throws IOException {

		phoneNB = in.readUTF();
		up_flow = in.readLong();
		d_flow = in.readLong();
		s_flow = in.readLong();
		
	}
	
	
	@Override
	public String toString() {

		return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
	}

	@Override
	public int compareTo(FlowBean o) {
		return s_flow>o.getS_flow()?-1:1;
	}
	

}

FlowSumMapperapp

package cn.itcast.hadoop.mr.flowsum;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


/**
 * FlowBean是咱們自定義的一種數據類型,要在 hadoop 中各個節點之間傳輸,應該遵循 hadoop 的序列化機制
 * 就必須實現 hadoop 相應的序列化接口
 * @author duanhaitao@itcast.cn
 *
 */
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

	
	//拿到日誌中的一行數據,切分各個字段,抽取出咱們須要的字段,手機號,上行流量,下行流量,而後封裝成 kv 發送出去
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {

		//拿一行數據
		String line = value.toString();
		//切分紅各個字段
		String[] fields = StringUtils.split(line, "\t");
		
		//拿到咱們須要的字段
		String phoneNB = fields[1];
		long u_flow = Long.parseLong(fields[7]);
		long d_flow = Long.parseLong(fields[8]);
		
		//封裝數據爲 kv 並輸出
		context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
		
	}
	
	
}

 

FlowSumReducer框架

package cn.itcast.hadoop.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
	
	
	//框架每使用一組數據<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>調用一次咱們的reduce方法
	//reduce中業務邏輯就是遍歷 values,而後進行裂甲求和再輸出
	@Override
	protected void reduce(Text key, Iterable<FlowBean> values,Context context)
			throws IOException, InterruptedException {

		long up_flow_counter = 0;
		long d_flow_counter = 0;
		
		for(FlowBean bean : values){
			
			up_flow_counter += bean.getUp_flow();
			d_flow_counter += bean.getD_flow();
			
		}
		
		
		context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
		
		
	}

}

 

FlowSumRunneride

package cn.itcast.hadoop.mr.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//job 描述和提交的規範寫法
public class FlowSumRunner extends Configured implements Tool{

	@Override
	public int run(String[] args) throws Exception {
		
		Configuration conf = new Configuration();	
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowSumRunner.class);
		
		job.setMapperClass(FlowSumMapper.class);
		job.setReducerClass(FlowSumReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		
		return job.waitForCompletion(true)?0:1;
	}
	
	
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
		System.exit(res);
	}

}

 

而後打包一個 jar 包函數

在終端,啓動 hdfs , yarn 。oop

put c:/flow.jar

數據放進去this

執行3d

結果日誌

相關文章
相關標籤/搜索