用hadoop統計A call B的次數

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class CallCount {
	public static class BILLING_TERM_Mapper extends 
	    Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context)
		throws IOException, InterruptedException {

	String line = value.toString();
	String details[] = line.split("	");
	String BILLING = details[0];
	String TERM = details[1];
	String BILLING_TERM_String = BILLING + "-" + TERM;
	word.set(BILLING_TERM_String);

	if(!"null".equals(BILLING) && !"".equals(BILLING)){
		context.write(word, one);
	}
  }
 }

   public static class BILLING_TERM_Reducer extends
	  Reducer<Text, IntWritable, Text, IntWritable> {
   private IntWritable result = new IntWritable();

   public void reduce(Text key, Iterable<IntWritable> values,
		  Context context) throws IOException, InterruptedException {
	  
	  int sum = 0;
	  for (IntWritable val : values) {
		  sum += val.get();
	  }
	  result.set(sum);
	  context.write(key, result);
  }
 }
   public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2){
			System.err.println("Usage: callcount ");
			System.exit(2);
		}
		/** Create A Job, Give A name, Tracking It**/
		Job job = new Job(conf, "Call Count");
		job.setJarByClass(CallCount.class);
		job.setMapperClass(BILLING_TERM_Mapper.class);
		job.setCombinerClass(BILLING_TERM_Reducer.class);
		job.setReducerClass(BILLING_TERM_Reducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

數據:
 
用hadoop統計A call B的次數(原創) - zarchary-10 - zarchary的博客
 
 
結果:
 
 

  
  
  
  

 
用hadoop統計A call B的次數(原創) - zarchary-10 - zarchary的博客
相關文章
相關標籤/搜索