對下面一組氣溫數據進行處理,獲得每月份最高的兩個氣溫值windows
2018-12-12 14:30 25c
2018-12-12 15:30 26c
2017-12-12 12:30 36c
2019-01-01 14:30 22c
2018-05-05 15:30 26c
2018-05-26 15:30 37c
2018-05-06 15:30 36c
2018-07-05 15:30 36c
2018-07-05 12:30 40c
2017-12-15 12:30 16capp
輸出格式以下:ide
2019-1 22
2018-12 26
2018-12 25
2018-7 40
2018-7 36
2018-5 37
2018-5 36
2017-12 36
2017-12 16oop
public class App { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(true); conf.set("fs.defaultFS","hdfs://hadoop01:9000");
//windows下面運行添加一下兩個配置 conf.set("mapreduce.app-submission.cross-platform","true"); conf.set("mapreduce.framework.name","local"); Job job = Job.getInstance(conf); //設置jobName job.setJobName("myJob"); job.setJarByClass(App.class); //配置map //mapper類 job.setMapperClass(MyMapperClass.class); //輸出的key類型 job.setMapOutputKeyClass(TQ.class); //輸出的value類型 job.setMapOutputValueClass(IntWritable.class); //將輸出的(K,V)=>(K,V,P) //job.setPartitionerClass(MyPartitioner.class); //數據在內存spill(溢寫)以前先排序,注:繼承WritableComparator job.setSortComparatorClass(MySortComparator.class); //配置reduce //根據需求肯定分組的維度,繼承自WritableComparator job.setGroupingComparatorClass(MyGrouping.class); //如map階段根據年、月、溫度三個維度排序,而reduce只根據年、月兩個維度 job.setReducerClass(MyReduce.class); Path input=new Path("/input/weather.txt"); Path out=new Path("/output/weather"); if(out.getFileSystem(conf).exists(out)){ out.getFileSystem(conf).delete(out,true); } //數據來源 HDFS路徑 FileInputFormat.addInputPath(job,input); //計算結果的輸出目錄 FileOutputFormat.setOutputPath(job,out); //job.setNumReduceTasks(2); job.waitForCompletion(true); } }
public class TQ implements WritableComparable<TQ> { private int year; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getTemp() { return temp; } public void setTemp(int temp) { this.temp = temp; } private int month; private int day; /** 溫度 */ private int temp; @Override public int compareTo(TQ other) { int c1= Integer.compare(this.getYear(),other.getYear()); if(c1==0){ return Integer.compare(this.getMonth(),other.getMonth()); } return c1; } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.year); out.writeInt(this.month); out.writeInt(this.day); out.writeInt(this.temp); } @Override public void readFields(DataInput in) throws IOException { this.year=in.readInt(); this.month=in.readInt(); this.day=in.readInt(); this.temp=in.readInt(); } }
/** * 根據年-月對map輸出進行分組 */ public class MyGrouping extends WritableComparator { public MyGrouping(){ super(TQ.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { TQ tq1 = (TQ) a; TQ tq2 = (TQ) b; if (tq1.getYear() == tq2.getYear() && tq1.getMonth() == tq2.getMonth()) { return 0; } return 1; } }
public class MyMapperClass extends Mapper<LongWritable,Text,TQ, IntWritable> { TQ tq=new TQ(); IntWritable outVal=new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[]splits= value.toString().split(" "); String[]date=splits[0].split("-"); tq.setYear(Integer.parseInt(date[0])); tq.setMonth(Integer.parseInt(date[1])); tq.setDay(Integer.parseInt(date[2])); tq.setTemp(Integer.parseInt(splits[2].replace("c",""))); outVal.set(tq.getTemp()); context.write(tq,outVal); } }
public class MyReduce extends Reducer<TQ, IntWritable, Text,IntWritable> { Text txtKey=new Text(); @Override protected void reduce(TQ key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { Iterator<IntWritable> iterator = values.iterator(); int flag=0; while (iterator.hasNext()) { if (flag == 2) { break; } txtKey.set(String.format("%s-%s",key.getYear(),key.getMonth())); IntWritable next = iterator.next(); context.write(txtKey,next); flag++; } } }
/** 數據在內存spill(溢寫)以前先排序,根據年月溫度 */ public class MySortComparator extends WritableComparator { public MySortComparator(){ super(TQ.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { TQ tq1=(TQ)a; TQ tq2=(TQ)b; int c1= Integer.compare(tq1.getYear(),tq2.getYear()); if(c1==0){ int c2=Integer.compare(tq1.getMonth(),tq2.getMonth()); if (c2 == 0) { return -Integer.compare(tq1.getTemp(),tq2.getTemp()); } return -c2; } return -c1; } }