MapReduce的一對多鏈接操做

問題描述:java

一個trade table表apache

product1"trade1數組

product2"trade2app

product3"trade3ide

一個pay table表oop

product1"pay1測試

product2"pay2this

product2"pay3spa

product1"pay4命令行

product3"pay5

product3"pay6

創建兩個表之間的鏈接,該兩表是一對多關係的

以下:

trade1pay1

trade1pay4

trade2pay2

...

思路:

       爲了將兩個表整合到一塊兒,因爲有相同的第一列,且第一個表與第二個表是一對多關係的。

這裏依然採用分組,以及組內排序,只要保證一方最早到達reduce端,則就能夠進行迭代處理了。

爲了保證第一個表先到達reduce端,能夠爲定義一個組合鍵,包含兩個值,第一個值爲product,第二個值爲0或者1,來分別表明第一個表和第二個表,只要按照組內升序排列便可。

具體代碼:

自定義組合鍵策略

package whut.onetomany;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class TextIntPair implements WritableComparable{
    //product1 0/1
    private String firstKey;//product1
    private int secondKey;//0,1;0表明是trade表,1表明是pay表
    //只須要保證trade表在pay表前面就行,則只須要對組順序排列
                                                          
    public String getFirstKey() {
        return firstKey;
    }
    public void setFirstKey(String firstKey) {
        this.firstKey = firstKey;
    }
    public int getSecondKey() {
        return secondKey;
    }
    public void setSecondKey(int secondKey) {
        this.secondKey = secondKey;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(firstKey);
        out.writeInt(secondKey);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        firstKey=in.readUTF();
        secondKey=in.readInt();
    }
                                                          
    @Override
    public int compareTo(Object o) {
        // TODO Auto-generated method stub
        TextIntPair tip=(TextIntPair)o;
        return this.getFirstKey().compareTo(tip.getFirstKey());
    }
}

分組策略

package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TextComparator extends WritableComparator{
    protected TextComparator() {
        super(TextIntPair.class,true);//註冊比較器
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextIntPair tip1=(TextIntPair)a;
        TextIntPair tip2=(TextIntPair)b;
        return tip1.getFirstKey().compareTo(tip2.getFirstKey());
    }
}

組內排序策略:目的是保證第一個表比第二個表先到達

package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分組內部進行排序,按照第二個字段進行排序
public class TextIntComparator extends WritableComparator {
    public TextIntComparator()
    {
        super(TextIntPair.class,true);
    }
    //這裏能夠進行排序的方式管理
    //必須保證是同一個分組的
    //a與b進行比較
    //若是a在前b在後,則會產生升序
    //若是a在後b在前,則會產生降序
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextIntPair ti1=(TextIntPair)a;
        TextIntPair ti2=(TextIntPair)b;
        //首先要保證是同一個組內,同一個組的標識就是第一個字段相同
        if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
           return ti1.getFirstKey().compareTo(ti2.getFirstKey());
        else
           return ti1.getSecondKey()-ti2.getSecondKey();//0,-1,1
    }
                                     
}

分區策略:

package whut.onetomany;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionByText extends Partitioner<TextIntPair, Text> {
    @Override
    public int getPartition(TextIntPair key, Text value, int numPartitions) {
        // TODO Auto-generated method stub
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    }
}

MapReduce

package whut.onetomany;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JoinMain extends Configured implements Tool {
    public static class JoinMapper extends Mapper<LongWritable, Text, TextIntPair, Text>
    {
        private TextIntPair tp=new TextIntPair();
        private Text val=new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //獲取要處理的文件的名稱
            FileSplit file=(FileSplit)context.getInputSplit();
            String fileName=file.getPath().toString();
            //獲取輸入行分隔
            String line=value.toString();
            String[] lineKeyValue=line.split("\"");
            String lineKey=lineKeyValue[0];
            String lineValue=lineKeyValue[1];
            tp.setFirstKey(lineKey);
            //判斷是不是trade文件
            if(fileName.indexOf("trade")>=0)
            {
                tp.setSecondKey(0);
                val.set(lineValue);
            }
            //判斷是不是pay文件
            else if(fileName.indexOf("pay")>=0)
            {
                tp.setSecondKey(1);
                val.set(lineValue);
            }
            context.write(tp, val);
        }
    }
                      
    public static class JoinReducer extends Reducer<TextIntPair, Text, Text, Text>
    {
        @Override
        protected void reduce(TextIntPair key, Iterable<Text> values,
                Context context)throws IOException, InterruptedException {
            Iterator<Text> valList=values.iterator();
            //注意這裏必定要寫成string不可變,寫成Text有問題
            //Text trade=valList.next();
            String tradeName=valList.next().toString();
            while(valList.hasNext())
            {
                Text pay=valList.next();
                context.write(new Text(tradeName), pay);
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf=getConf();
        Job job=new Job(conf,"JoinJob");
        job.setJarByClass(JoinMain.class);
        //ToolRunner已經利用GenericOptionsParser解析了命令行中的參數
        //而且將其存放在數組中,傳遞給該run()方法了
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));
        //輸入文件必須以,隔開
        //FileInputFormat.addInputPaths(job, args[0]);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
                          
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        //設置分區方法
        job.setPartitionerClass(PartitionByText.class);
        //設置分組排序
        job.setGroupingComparatorClass(TextComparator.class);
        job.setSortComparatorClass(TextIntComparator.class);
                          
        job.setMapOutputKeyClass(TextIntPair.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.waitForCompletion(true);
        int exitCode=job.isSuccessful()?0:1;
        return exitCode;
    }
    public static void main(String[] args)throws Exception
    {
        // TODO Auto-generated method stub
        int code=ToolRunner.run(new JoinMain(), args);
        System.exit(code);
    }
}


注意:

     通常有些地方沒有定義組內排序策略,可是通過屢次測試,發現沒法保證第一個表在第二個表以前到達,則這裏就自定義了組內排序策略。版本號爲Hadoop1.1.2

相關文章
相關標籤/搜索