版本號:java
CDH5.0.0 (hdfs:2.3。mapreduce:2.3,yarn:2.3)node
hadoop多文件格式輸入,通常可以使用MultipleInputs類指定不一樣的輸入文件路徑以及輸入文件格式。apache
比方現在有例如如下的需求:app
現有兩份數據:ide
phone:函數
123,good number 124,common number 125,bad numberuser:
zhangsan,123 lisi,124 wangwu,125
zhangsan,123,good number lisi,124,common number wangwu,125,bad number那麼就可以使用MultipleInputs來操做,這裏把user和phone上傳到hdfs文件夾中,各自是/multiple/user/user , /multiple/phone/phone。
設計的MultipleDriver例如如下:oop
package multiple.input; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; /** * input1(/multiple/user/user): * username,user_phone * * input2(/multiple/phone/phone): * user_phone,description * * output: username,user_phone,description * * @author fansy * */ public class MultipleDriver extends Configured implements Tool{ // private Logger log = LoggerFactory.getLogger(MultipleDriver.class); private String input1=null; private String input2=null; private String output=null; private String delimiter=null; public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); // conf.set("fs.defaultFS", "hdfs://node33:8020"); // conf.set("mapreduce.framework.name", "yarn"); // conf.set("yarn.resourcemanager.address", "node33:8032"); ToolRunner.run(conf, new MultipleDriver(), args); } @Override public int run(String[] arg0) throws Exception { configureArgs(arg0); checkArgs(); Configuration conf= getConf(); conf.set("delimiter", delimiter); @SuppressWarnings("deprecation") Job job = new Job(conf, "merge user and phone information "); job.setJarByClass(MultipleDriver.class); job.setReducerClass(MultipleReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlagStringDataType.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class); MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class); FileOutputFormat.setOutputPath(job, new Path(output)); int res = job.waitForCompletion(true) ? 0 : 1; return res; } /** * check the args */ private void checkArgs() { if(input1==null||"".equals(input1)){ System.out.println("no user input..."); printUsage(); System.exit(-1); } if(input2==null||"".equals(input2)){ System.out.println("no phone input..."); printUsage(); System.exit(-1); } if(output==null||"".equals(output)){ System.out.println("no output..."); printUsage(); System.exit(-1); } if(delimiter==null||"".equals(delimiter)){ System.out.println("no delimiter..."); printUsage(); System.exit(-1); } } /** * configuration the args * @param args */ private void configureArgs(String[] args) { for(int i=0;i<args.length;i++){ if("-i1".equals(args[i])){ input1=args[++i]; } if("-i2".equals(args[i])){ input2=args[++i]; } if("-o".equals(args[i])){ output=args[++i]; } if("-delimiter".equals(args[i])){ delimiter=args[++i]; } } } public static void printUsage(){ System.err.println("Usage:"); System.err.println("-i1 input \t user data path."); System.err.println("-i2 input \t phone data path."); System.err.println("-o output \t output data path."); System.err.println("-delimiter data delimiter , default is comma ."); } }
mapper1(處理user數據):post
package multiple.input; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * input : * username,phone * * output: * <key,value> --> <[phone],[0,username]> * @author fansy * */ public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{ private Logger log = LoggerFactory.getLogger(Multiple1Mapper.class); private String delimiter=null; // default is comma @Override public void setup(Context cxt){ delimiter= cxt.getConfiguration().get("delimiter", ","); log.info("This is the begin of Multiple1Mapper"); } @Override public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{ String info= new String(value.getBytes(),"UTF-8"); String[] values = info.split(delimiter); if(values.length!=2){ return; } log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]"); cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0])); } }
package multiple.input; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * input : * phone,description * * output: * <key,value> --> <[phone],[1,description]> * @author fansy * */ public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{ private Logger log = LoggerFactory.getLogger(Multiple2Mapper.class); private String delimiter=null; // default is comma @Override public void setup(Context cxt){ delimiter= cxt.getConfiguration().get("delimiter", ","); log.info("This is the begin of Multiple2Mapper"); } @Override public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{ String[] values= value.toString().split(delimiter); if(values.length!=2){ return; } log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]"); cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1])); } }這裏的FlagStringDataType是本身定義的:
package multiple.input; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.primitives.Ints; public class FlagStringDataType implements WritableComparable<FlagStringDataType> { private Logger log = LoggerFactory.getLogger(FlagStringDataType.class); private String value; private int flag; public FlagStringDataType() { } public FlagStringDataType(int flag,String value) { this.value = value; this.flag=flag; } public String get() { return value; } public void set(String value) { this.value = value; } @Override public boolean equals(Object other) { return other != null && getClass().equals(other.getClass()) && ((FlagStringDataType) other).get() == value &&((FlagStringDataType) other).getFlag()==flag; } @Override public int hashCode() { return Ints.hashCode(flag)+value.hashCode(); } @Override public int compareTo(FlagStringDataType other) { if (flag >= other.flag) { if (flag > other.flag) { return 1; } } else { return -1; } return value.compareTo(other.value); } @Override public void write(DataOutput out) throws IOException { log.info("in write()::"+"flag:"+flag+",vlaue:"+value); out.writeInt(flag); out.writeUTF(value); } @Override public void readFields(DataInput in) throws IOException { log.info("in read()::"+"flag:"+flag+",vlaue:"+value); flag=in.readInt(); value = in.readUTF(); log.info("in read()::"+"flag:"+flag+",vlaue:"+value); } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } public String toString(){ return flag+":"+value; } }
這樣作的優勢是在reduce端可以依據flag的值來推斷其輸出位置。這樣的設計方式可以對多種輸入的整合有很是大幫助,在mahout中也可以看到這樣的設計。this
reducer(彙總輸出數據):google
package multiple.input; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{ private Logger log = LoggerFactory.getLogger(MultipleReducer.class); private String delimiter=null; // default is comma @Override public void setup(Context cxt){ delimiter= cxt.getConfiguration().get("delimiter", ","); } @Override public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{ log.info("================"); log.info(" ======="); log.info(" =="); String[] value= new String[3]; value[2]=key.toString(); for(FlagStringDataType v:values){ int index= v.getFlag(); log.info("index:"+index+"-->value:"+v.get()); value[index]= v.get(); } log.info(" =="); log.info(" ======="); log.info("================"); cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get()); } }
如下介紹一種方式和上面的比。略有不足。但是可以借鑑。
首先是Driver:
package multiple.input; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; /** * input1(/multiple/user/user): * username,user_phone * * input2(/multiple/phone/phone): * user_phone,description * * output: username,user_phone,description * * @author fansy * */ public class MultipleDriver2 extends Configured implements Tool{ // private Logger log = LoggerFactory.getLogger(MultipleDriver.class); private String input1=null; private String input2=null; private String output=null; private String delimiter=null; public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); // conf.set("fs.defaultFS", "hdfs://node33:8020"); // conf.set("mapreduce.framework.name", "yarn"); // conf.set("yarn.resourcemanager.address", "node33:8032"); ToolRunner.run(conf, new MultipleDriver2(), args); } @Override public int run(String[] arg0) throws Exception { configureArgs(arg0); checkArgs(); Configuration conf= getConf(); conf.set("delimiter", delimiter); @SuppressWarnings("deprecation") Job job = new Job(conf, "merge user and phone information "); job.setJarByClass(MultipleDriver2.class); job.setMapperClass(MultipleMapper.class); job.setReducerClass(MultipleReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlagStringDataType.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(input1)); FileInputFormat.addInputPath(job, new Path(input2)); FileOutputFormat.setOutputPath(job, new Path(output)); int res = job.waitForCompletion(true) ? 0 : 1; return res; } /** * check the args */ private void checkArgs() { if(input1==null||"".equals(input1)){ System.out.println("no user input..."); printUsage(); System.exit(-1); } if(input2==null||"".equals(input2)){ System.out.println("no phone input..."); printUsage(); System.exit(-1); } if(output==null||"".equals(output)){ System.out.println("no output..."); printUsage(); System.exit(-1); } if(delimiter==null||"".equals(delimiter)){ System.out.println("no delimiter..."); printUsage(); System.exit(-1); } } /** * configuration the args * @param args */ private void configureArgs(String[] args) { for(int i=0;i<args.length;i++){ if("-i1".equals(args[i])){ input1=args[++i]; } if("-i2".equals(args[i])){ input2=args[++i]; } if("-o".equals(args[i])){ output=args[++i]; } if("-delimiter".equals(args[i])){ delimiter=args[++i]; } } } public static void printUsage(){ System.err.println("Usage:"); System.err.println("-i1 input \t user data path."); System.err.println("-i2 input \t phone data path."); System.err.println("-o output \t output data path."); System.err.println("-delimiter data delimiter , default is comma ."); } }
package multiple.input; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * input1 : * username,phone * * input2 * phone,description * * output: * <key,value> --> <[phone],[0,username]> * <key,value> --> <[phone],[1,description]> * @author fansy * */ public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{ private String delimiter=null; // default is comma private boolean flag=false; @Override public void setup(Context cxt){ delimiter= cxt.getConfiguration().get("delimiter", ","); InputSplit input=cxt.getInputSplit(); String filename=((FileSplit) input).getPath().getParent().getName(); if("user".equals(filename)){ flag=true; } } @Override public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{ String[] values= value.toString().split(delimiter); if(values.length!=2){ return; } if(flag){ cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0])); }else{ cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1])); } } }
因此針對多文件格式的輸入,不妨使用第一種方式。
分享,成長,快樂
轉載請註明blog地址:http://blog.csdn.net/fansy1990