mapreduce文件匹配相同數據多文件輸出

package mapper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;




public class deviceApp extends Configured implements Tool{
  public static class cleanMap extends Mapper<LongWritable, Text, Text, deviceAppWritable>{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
      if(!value.toString().split("\\$")[0].contains("null")){	  
	  final String[] splited = value.toString().split("\\$");
	  final String atetid = splited[0];
	  final Text k2 = new Text(atetid);
	  final deviceAppWritable v2 = new deviceAppWritable(splited[1],splited[4],splited[5],splited[7],splited[16],splited[18],splited[19],splited[20]);
	  context.write(k2, v2);
	  }
	}
}
  
  
  public static class cleanReduce extends Reducer<Text, deviceAppWritable, Text, deviceAppWritable>{
	  
	  private MultipleOutputs mos; 
		
		protected void setup(
				Reducer<Text, deviceAppWritable, Text, deviceAppWritable>.Context context)
				throws IOException, InterruptedException {
              mos = new MultipleOutputs(context);
		}
		
	protected void reduce(Text k2, Iterable<deviceAppWritable> v2s,Context context) throws IOException ,InterruptedException {
		String deviceid;
		String deviceType;
		String versioncode;
		String packagename;
		String channelid;
		String clientip;
		String createtime;
		String state;
	   for(deviceAppWritable deviceAppWritable : v2s){
			deviceid = deviceAppWritable.deviceid;
			deviceType = deviceAppWritable.deviceType;
			versioncode = deviceAppWritable.versioncode;
			packagename = deviceAppWritable.packagename;
			channelid = deviceAppWritable.channelid;
			clientip = deviceAppWritable.clientip;
			createtime = deviceAppWritable.createtime;
			state = deviceAppWritable.state;
			if(packagename.equals("com.sxhl.tcltvmarket")||packagename.equals("com.atet.tvmarket")||packagename.equals("com.sxhl.market")||packagename.equals("")||packagename==null){
				mos.write(k2, new deviceAppWritable(deviceid, deviceType,versioncode,packagename,channelid,clientip,createtime,
						state), "AtetMarket");
			}else if(packagename.equals("com.atet.familytime.tv")){
				mos.write(k2, new deviceAppWritable(deviceid, deviceType,versioncode,packagename,channelid,clientip,createtime,
						state),"AtetFamily");
			}
		}
  }
	@Override
	protected void cleanup(
			Reducer<Text, deviceAppWritable, Text, deviceAppWritable>.Context context)
			throws IOException, InterruptedException {
            mos.close();
	}
	
 }
  
  
  //job啓動
	@SuppressWarnings("deprecation")
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf=new Configuration();
		String[]argArray=new GenericOptionsParser(conf, args).getRemainingArgs();  
      if(argArray.length!=2){  
         System.out.println("請提供兩個參數");  
          System.exit(1);  
      }       
        Job job=Job.getInstance(conf, "deviceApp");
        FileSystem fs = FileSystem.get(new URI(args[1]), conf);
 	    fs.delete(new Path(args[1]));
		job.setJarByClass(deviceApp.class); 
		job.setMapperClass(cleanMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(deviceAppWritable.class);
		job.setReducerClass(cleanReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(deviceAppWritable.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		MultipleOutputs.addNamedOutput(job, "AtetMarket", TextOutputFormat.class, Text.class, deviceAppWritable.class); 
		MultipleOutputs.addNamedOutput(job, "AtetFamily", TextOutputFormat.class, Text.class, deviceAppWritable.class); 
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));  
		job.waitForCompletion(true);
		return 0;
	}
	
	//main方法啓動調用run方法的job
	 public static void main(String [] args) throws IOException, Exception{
				   ToolRunner.run(new deviceApp(), args);
	}	
}


//讀取文件、寫入文件格式轉化
class deviceAppWritable implements Writable{
		String deviceid;
		String deviceType;
		String versioncode;
		String packagename;
		String channelid;
		String clientip;
		String createtime;
	    String state;
	    
	public  deviceAppWritable(){}
	
	public deviceAppWritable(String deviceid,String deviceType,String versioncode,String packagename,String channelid,String clientip,String createtime,String state){
	    this.deviceid = deviceid;
	    this.deviceType = deviceType;
	    this.versioncode = versioncode;
		this.packagename = packagename;
		this.channelid = channelid;
		this.clientip = clientip;
		this.createtime = createtime;
		this.state=state;
	}
	
 	
	
	//讀取文件類型進行格式轉化。
	public void readFields(DataInput in) throws IOException {
		this.deviceid = in.readUTF();
		this.deviceType = in.readUTF();
		this.versioncode = in.readUTF();
		this.packagename = in.readUTF();
		this.channelid = in.readUTF();
		this.clientip = in.readUTF();
		this.createtime = in.readUTF();
		this.state = in.readUTF();
	}

	
	//寫入文件
	public void write(DataOutput out) throws IOException {
		out.writeUTF(deviceid);
		out.writeUTF(deviceType);
		out.writeUTF(versioncode);
		out.writeUTF(packagename);
		out.writeUTF(channelid);
		out.writeUTF(clientip);
		out.writeUTF(createtime);
		out.writeUTF(state);
	}
	
	@Override
	public String toString() {
		return deviceid + "\t" +deviceType + "\t" +versioncode + "\t" +packagename + "\t" + channelid + "\t" + clientip + "\t" + createtime + "\t" +state + "\t";
	}
}
相關文章
相關標籤/搜索