package mapper; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class deviceApp extends Configured implements Tool{ public static class cleanMap extends Mapper<LongWritable, Text, Text, deviceAppWritable>{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ if(!value.toString().split("\\$")[0].contains("null")){ final String[] splited = value.toString().split("\\$"); final String atetid = splited[0]; final Text k2 = new Text(atetid); final deviceAppWritable v2 = new deviceAppWritable(splited[1],splited[4],splited[5],splited[7],splited[16],splited[18],splited[19],splited[20]); context.write(k2, v2); } } } public static class cleanReduce extends Reducer<Text, deviceAppWritable, Text, deviceAppWritable>{ private MultipleOutputs mos; protected void setup( Reducer<Text, deviceAppWritable, Text, deviceAppWritable>.Context context) throws IOException, InterruptedException { mos = new MultipleOutputs(context); } protected void reduce(Text k2, Iterable<deviceAppWritable> v2s,Context context) throws IOException ,InterruptedException { String deviceid; String deviceType; String versioncode; String packagename; String channelid; String clientip; String createtime; String state; for(deviceAppWritable deviceAppWritable : v2s){ deviceid = deviceAppWritable.deviceid; deviceType = deviceAppWritable.deviceType; versioncode = deviceAppWritable.versioncode; packagename = deviceAppWritable.packagename; channelid = deviceAppWritable.channelid; clientip = deviceAppWritable.clientip; createtime = deviceAppWritable.createtime; state = deviceAppWritable.state; if(packagename.equals("com.sxhl.tcltvmarket")||packagename.equals("com.atet.tvmarket")||packagename.equals("com.sxhl.market")||packagename.equals("")||packagename==null){ mos.write(k2, new deviceAppWritable(deviceid, deviceType,versioncode,packagename,channelid,clientip,createtime, state), "AtetMarket"); }else if(packagename.equals("com.atet.familytime.tv")){ mos.write(k2, new deviceAppWritable(deviceid, deviceType,versioncode,packagename,channelid,clientip,createtime, state),"AtetFamily"); } } } @Override protected void cleanup( Reducer<Text, deviceAppWritable, Text, deviceAppWritable>.Context context) throws IOException, InterruptedException { mos.close(); } } //job啓動 @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { Configuration conf=new Configuration(); String[]argArray=new GenericOptionsParser(conf, args).getRemainingArgs(); if(argArray.length!=2){ System.out.println("請提供兩個參數"); System.exit(1); } Job job=Job.getInstance(conf, "deviceApp"); FileSystem fs = FileSystem.get(new URI(args[1]), conf); fs.delete(new Path(args[1])); job.setJarByClass(deviceApp.class); job.setMapperClass(cleanMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(deviceAppWritable.class); job.setReducerClass(cleanReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(deviceAppWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); MultipleOutputs.addNamedOutput(job, "AtetMarket", TextOutputFormat.class, Text.class, deviceAppWritable.class); MultipleOutputs.addNamedOutput(job, "AtetFamily", TextOutputFormat.class, Text.class, deviceAppWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); return 0; } //main方法啓動調用run方法的job public static void main(String [] args) throws IOException, Exception{ ToolRunner.run(new deviceApp(), args); } } //讀取文件、寫入文件格式轉化 class deviceAppWritable implements Writable{ String deviceid; String deviceType; String versioncode; String packagename; String channelid; String clientip; String createtime; String state; public deviceAppWritable(){} public deviceAppWritable(String deviceid,String deviceType,String versioncode,String packagename,String channelid,String clientip,String createtime,String state){ this.deviceid = deviceid; this.deviceType = deviceType; this.versioncode = versioncode; this.packagename = packagename; this.channelid = channelid; this.clientip = clientip; this.createtime = createtime; this.state=state; } //讀取文件類型進行格式轉化。 public void readFields(DataInput in) throws IOException { this.deviceid = in.readUTF(); this.deviceType = in.readUTF(); this.versioncode = in.readUTF(); this.packagename = in.readUTF(); this.channelid = in.readUTF(); this.clientip = in.readUTF(); this.createtime = in.readUTF(); this.state = in.readUTF(); } //寫入文件 public void write(DataOutput out) throws IOException { out.writeUTF(deviceid); out.writeUTF(deviceType); out.writeUTF(versioncode); out.writeUTF(packagename); out.writeUTF(channelid); out.writeUTF(clientip); out.writeUTF(createtime); out.writeUTF(state); } @Override public String toString() { return deviceid + "\t" +deviceType + "\t" +versioncode + "\t" +packagename + "\t" + channelid + "\t" + clientip + "\t" + createtime + "\t" +state + "\t"; } }