hadoop 氣象數據

import java.io.IOException;  
 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.io.IntWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.mapreduce.Job;  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 
 

 
   
 public class MaxTemperature {  
     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{  
         if(args.length != 2){  
             System.err.println("Usage: MaxTemperature <input path> <output path>");  
             System.exit(-1);  
         }  
           
         Configuration conf = new Configuration();  
           
         Job job = new Job(conf, "Max Temperature");  
           
         job.setJarByClass(MaxTemperature.class);  
   
         job.setMapperClass(MaxTemperatureMapper.class);  
         job.setReducerClass(MaxTemperatureReducer.class);  
           
         job.setOutputKeyClass(Text.class);  
         job.setOutputValueClass(IntWritable.class);  
           
         FileInputFormat.addInputPath(job, new Path(args[0]));  
         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
           
         System.exit(job.waitForCompletion(true) ? 0 : 1);  
     }  
 }

import java.io.IOException;  
    import org.apache.hadoop.io.IntWritable;  
    import org.apache.hadoop.io.Text;  
    
    import org.apache.hadoop.mapreduce.Mapper;  
   
   
      
public class MaxTemperatureMapper extends Mapper<Object, Text, Text, IntWritable>{  
      
      private static final int MISSING = 9999;
      public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
      String line = value.toString();
      String year = line.substring(15,19);
      int airTemperature;
      if(line.charAt(87)=='+'){
    airTemperature = Integer.parseInt(line.substring(88,92));
   }else{
    airTemperature = Integer.parseInt(line.substring(87,92));
   }
      String quality = line.substring(92,93);
     if(airTemperature != MISSING && quality.matches("[01459]")){
        context.write(new Text(year), new IntWritable(airTemperature));
       }
      }
      
}

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {  
   
     public void reduce(Text key, Iterable<IntWritable> values,Context context)   
             throws IOException, InterruptedException{  
         int maxValue = Integer.MIN_VALUE;  
         for (IntWritable value : values) {
          maxValue = Math.max(maxValue, value.get());
          }
         context.write(key, new IntWritable(maxValue));
     }  
 }
相關文章
相關標籤/搜索