一個自動修正數據時間和補全缺失數據的MapReduce程序

原始數據以下圖:

程序:

Mapper類:

 1 public class DemoMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
 2     IntWritable k = new IntWritable();
 3     Text v = new Text();
 4     
 5     @Override
 6     protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,IntWritable,Text>.Context context)
 7             throws IOException,InterruptedException{
 8         String[] data = value.toString().split(",");
 9         k.set(Integer.parseInt(data[0]));
10         try{
11             v.set(Utils.getFixTime(data[1]));
12             context.write(k,v);
13         }catch(ParseException e){
14             e.printStackTrace();
15         }
16             }
17 }

 

Reducer類:

 1 public class DemoReducer extends Reducer<IntWritable,Text,NullWritable,Text>{
 2     Text v = new Text();
 3     
 4     @Override
 5     protected void reduce(IntWritable key,Iterable<Text> values,Reducer<IntWritable,Text,NullWritable,Text>.Context context)
 6             throws IOException,InterruptedException{
 7         TreeSet<Long> timeSet = new TreeSet<>();
 8         for(Text value : values){
 9             try{
10                 timeSet.add(getTime(value.toString()));
11             }catch{
12                 e.printStackTrace();
13             }
14         }
15         long tmp = -1;
16         for(long time :timeSet){
17             if(tmp == -1){
18                 v.set(key.toString()+","+getDate(time));
19                 context.write(NullWritable.get(),v);
20             }else{
21                 if(time - tmp > 900000){
22                     forint i=0;i<= (time - tmp)/900000;i++){
23                         v.set(key.toString()+","+getDate(tmp+900000*i));
24                     }
25                 }else{
26                     v.set(key.toString()+","+getDate(time));
27                     context.write(NullWritable.get(),v);
28                 }
29             }
30             tmp =time;
31         }
32             }
33 public static long getTime(String str)throws ParseException{
34     SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
35     return simpleDateFormat.parse(str).getTime();
36 }
37 
38 public static String getDate(long timetmp){
39     SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
40     return simpleDateFormat.format(timeStamp);
41 }
42 }

 

Driver類:

 1 public class DemoDriver{
 2     public static void main(String[] args)throws IllegalArgumentException,IOException,ClassNotFoundException,InterruptedException{
 3         if(args.length <2){
 4             System.err.println("you must input two argument!");
 5             System.exit(-1);
 6         }
 7         Configuration conf = Utils.getConf();
 8         Job job =Job.getInstance(conf, "fix time");
 9         job.setJarByClass(DemoDriver.class);
10         job.setMapperClass(DemoMapper.class);
11         job.setReducerClass(DemoReducer.class);
12         job.setMapOutputKeyClass(IntWritable.class);
13         job.setMapOutputValueClass(Text.class);
14         job.setOutputKeyClass(NullWritable.class);
15         job.setOutputValueClass(Text.class);
16         job.setNumReduceTask(1);
17         for(int i =0;i <args.length-1;i++){
18             FileInputFormat.addInputPath(job,new Path(args[i]));
19         }
20         FileSystem.get(conf).delete(new Path(args[args.length-1]),true);
21         FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));
22         System.exit(job.waitForCompletion(true)?0:1);
23     }
24 }
相關文章
相關標籤/搜索