import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{ if(args.length != 2){ System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } Configuration conf = new Configuration(); Job job = new Job(conf, "Max Temperature"); job.setJarByClass(MaxTemperature.class); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper<Object, Text, Text, IntWritable>{ private static final int MISSING = 9999; public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line = value.toString(); String year = line.substring(15,19); int airTemperature; if(line.charAt(87)=='+'){ airTemperature = Integer.parseInt(line.substring(88,92)); }else{ airTemperature = Integer.parseInt(line.substring(87,92)); } String quality = line.substring(92,93); if(airTemperature != MISSING && quality.matches("[01459]")){ context.write(new Text(year), new IntWritable(airTemperature)); } } }
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }