問題描述:java
一個trade table表apache
product1"trade1數組
product2"trade2app
product3"trade3ide
一個pay table表oop
product1"pay1測試
product2"pay2this
product2"pay3spa
product1"pay4命令行
product3"pay5
product3"pay6
創建兩個表之間的鏈接,該兩表是一對多關係的
以下:
trade1pay1
trade1pay4
trade2pay2
...
思路:
爲了將兩個表整合到一塊兒,因爲有相同的第一列,且第一個表與第二個表是一對多關係的。
這裏依然採用分組,以及組內排序,只要保證一方最早到達reduce端,則就能夠進行迭代處理了。
爲了保證第一個表先到達reduce端,能夠爲定義一個組合鍵,包含兩個值,第一個值爲product,第二個值爲0或者1,來分別表明第一個表和第二個表,只要按照組內升序排列便可。
具體代碼:
自定義組合鍵策略
package whut.onetomany; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class TextIntPair implements WritableComparable{ //product1 0/1 private String firstKey;//product1 private int secondKey;//0,1;0表明是trade表,1表明是pay表 //只須要保證trade表在pay表前面就行,則只須要對組順序排列 public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub firstKey=in.readUTF(); secondKey=in.readInt(); } @Override public int compareTo(Object o) { // TODO Auto-generated method stub TextIntPair tip=(TextIntPair)o; return this.getFirstKey().compareTo(tip.getFirstKey()); } }
分組策略
package whut.onetomany; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TextComparator extends WritableComparator{ protected TextComparator() { super(TextIntPair.class,true);//註冊比較器 } @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextIntPair tip1=(TextIntPair)a; TextIntPair tip2=(TextIntPair)b; return tip1.getFirstKey().compareTo(tip2.getFirstKey()); } }
組內排序策略:目的是保證第一個表比第二個表先到達
package whut.onetomany; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //分組內部進行排序,按照第二個字段進行排序 public class TextIntComparator extends WritableComparator { public TextIntComparator() { super(TextIntPair.class,true); } //這裏能夠進行排序的方式管理 //必須保證是同一個分組的 //a與b進行比較 //若是a在前b在後,則會產生升序 //若是a在後b在前,則會產生降序 @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextIntPair ti1=(TextIntPair)a; TextIntPair ti2=(TextIntPair)b; //首先要保證是同一個組內,同一個組的標識就是第一個字段相同 if(!ti1.getFirstKey().equals(ti2.getFirstKey())) return ti1.getFirstKey().compareTo(ti2.getFirstKey()); else return ti1.getSecondKey()-ti2.getSecondKey();//0,-1,1 } }
分區策略:
package whut.onetomany; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PartitionByText extends Partitioner<TextIntPair, Text> { @Override public int getPartition(TextIntPair key, Text value, int numPartitions) { // TODO Auto-generated method stub return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; } }
MapReduce
package whut.onetomany; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JoinMain extends Configured implements Tool { public static class JoinMapper extends Mapper<LongWritable, Text, TextIntPair, Text> { private TextIntPair tp=new TextIntPair(); private Text val=new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //獲取要處理的文件的名稱 FileSplit file=(FileSplit)context.getInputSplit(); String fileName=file.getPath().toString(); //獲取輸入行分隔 String line=value.toString(); String[] lineKeyValue=line.split("\""); String lineKey=lineKeyValue[0]; String lineValue=lineKeyValue[1]; tp.setFirstKey(lineKey); //判斷是不是trade文件 if(fileName.indexOf("trade")>=0) { tp.setSecondKey(0); val.set(lineValue); } //判斷是不是pay文件 else if(fileName.indexOf("pay")>=0) { tp.setSecondKey(1); val.set(lineValue); } context.write(tp, val); } } public static class JoinReducer extends Reducer<TextIntPair, Text, Text, Text> { @Override protected void reduce(TextIntPair key, Iterable<Text> values, Context context)throws IOException, InterruptedException { Iterator<Text> valList=values.iterator(); //注意這裏必定要寫成string不可變,寫成Text有問題 //Text trade=valList.next(); String tradeName=valList.next().toString(); while(valList.hasNext()) { Text pay=valList.next(); context.write(new Text(tradeName), pay); } } } @Override public int run(String[] args) throws Exception { Configuration conf=getConf(); Job job=new Job(conf,"JoinJob"); job.setJarByClass(JoinMain.class); //ToolRunner已經利用GenericOptionsParser解析了命令行中的參數 //而且將其存放在數組中,傳遞給該run()方法了 FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.addInputPath(job, new Path(args[1])); //輸入文件必須以,隔開 //FileInputFormat.addInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); //設置分區方法 job.setPartitionerClass(PartitionByText.class); //設置分組排序 job.setGroupingComparatorClass(TextComparator.class); job.setSortComparatorClass(TextIntComparator.class); job.setMapOutputKeyClass(TextIntPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); int exitCode=job.isSuccessful()?0:1; return exitCode; } public static void main(String[] args)throws Exception { // TODO Auto-generated method stub int code=ToolRunner.run(new JoinMain(), args); System.exit(code); } }
注意:
通常有些地方沒有定義組內排序策略,可是通過屢次測試,發現沒法保證第一個表在第二個表以前到達,則這裏就自定義了組內排序策略。版本號爲Hadoop1.1.2