05Hadoop-左外鏈接

場景:有兩張表,一張用戶表(user),交易表(transactions)。兩張表的字段以下:java

兩份表數據作個左鏈接,查詢出(商品名,地址)這種格式。apache

 這樣就是至關於交易表是左表,無論怎麼樣數據都要保留,而後從右邊裏面查出來彌補左表。app

效果以下:ide

 

 

 思路:寫兩個map,把兩個表的數據都讀進來,在reduce端進行鏈接,而後按照格式要求寫出去。oop

(1)map1:讀取transaction文件,封裝爲:ui

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)spa

throws IOException, InterruptedException {3d

              String lines=value.toString();code

              String[] args=lines.split(" ");orm

              String productID=args[1];

              String userID=args[2];

    //把outPutKey加了一個2,這麼作的目的是,後續在reduce端,聚合時,這個數據可以晚點到達。

              outPutKey.set(userID, "2");

              outPutValue.set("P", productID);

              context.write(outPutKey, outPutValue);

}

 

 (2)map2:讀取user文件,封裝爲:

static class map2 extends Mapper<LongWritable, Text,PairOfStrings,PairOfStrings>

{

PairOfStrings outPutKey=new PairOfStrings();

PairOfStrings outPutvalue=new PairOfStrings();

@Override

protected void map(LongWritable key, Text value,

Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] args=line.split(" ");

String userID=args[0];

String locationID=args[1];

 //把outPutKey加了一個1,這麼作的目的是,後續在reduce端,聚合時,這個數據可以早於transaction文件裏面的數據到達。

outPutKey.set(userID, "1");

outPutvalue.set("L", locationID);

context.write(outPutKey, outPutvalue);

}

 

(3)reduce:把map端的數據要根據用戶ID分區,相同的用戶ID寫入到同一個分區,進而寫入到同一個Reduce分區,而後在Reduce中根據PairOfStrings這個類的本身的排序規則對數據排序。由於前面對key作了處理(加了1,2),因此是用戶的地址這些信息先到達reduce。,而後根據不一樣的分組,把數據寫出來。 

 

 






總的代碼結構:

LeftCmain:

package com.guigu.left;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import edu.umd.cloud9.io.pair.PairOfStrings;

public class LeftCmain {
    
    //讀取transaction文件
    static class map1 extends Mapper<LongWritable, Text, PairOfStrings,PairOfStrings>
    {    
        PairOfStrings outPutKey=new PairOfStrings();
        PairOfStrings outPutValue=new PairOfStrings();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)
                throws IOException, InterruptedException {
              String lines=value.toString();
              String[] args=lines.split(" ");
              String productID=args[1];
              String userID=args[2];
              outPutKey.set(userID, "2");
              outPutValue.set("P", productID);
              context.write(outPutKey, outPutValue);
        }
        
    }
    
    //讀取user文件
    static class map2 extends Mapper<LongWritable, Text,PairOfStrings,PairOfStrings>
    {
        PairOfStrings outPutKey=new PairOfStrings();
        PairOfStrings outPutvalue=new PairOfStrings();
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)
                throws IOException, InterruptedException {
            String line=value.toString();
            String[] args=line.split(" ");
            String userID=args[0];
            String locationID=args[1];
            outPutKey.set(userID, "1");
            outPutvalue.set("L", locationID);
            context.write(outPutKey, outPutvalue);
        }
        
    }
    
    
    /**
     * 這個的關鍵點在於,取出的數據:要求先取出地址的數據。
     * @author Sxq
     *
     */
    static class reduce1 extends Reducer<PairOfStrings, PairOfStrings, Text, Text>
    {
        Text produceID=new Text();
        Text localID=new Text("undefine");
        
        @Override
        protected void reduce(PairOfStrings arg0, Iterable<PairOfStrings> Iterator1,
                Reducer<PairOfStrings, PairOfStrings, Text, Text>.Context context)
                throws IOException, InterruptedException {
            Iterator<PairOfStrings> iterator=Iterator1.iterator();
        
            //因爲作了二次排序,能夠保證先獲得的是地址的數據。
            if(iterator.hasNext())
            {
                PairOfStrings fisrPair=iterator.next();
            //    System.out.println("firstPair="+fisrPair.toString());
                //若是是地址的信息,那就把他直接放出來
                if(fisrPair.getLeftElement().equals("L"))
                {
                    localID.set(fisrPair.getRightElement());
                }
            }
            while(iterator.hasNext())
            {
                PairOfStrings pairOfStrings=iterator.next();
                //System.out.println(pairOfStrings.toString());
                produceID.set(pairOfStrings.getRightElement());
                System.out.println("prdouct:"+produceID.toString()+"localId:"+localID.toString());
                //System.out.println();
                context.write(produceID, localID);
            }    
        }
    }
    
    
    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(LeftCmain.class);

        job.setMapperClass(map1.class);
        job.setReducerClass(reduce1.class);

        job.setMapOutputKeyClass(PairOfStrings.class);
        job.setMapOutputValueClass(PairOfStrings.class);
        job.setOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setSortComparatorClass(PairOfStrings.Comparator.class);
        
        // 在Reduce端設置分組,使得同一個用戶在同一個組,而後作拼接。
        job.setGroupingComparatorClass(SecondarySortGroupComparator.class);
        // 設置分區
        job.setPartitionerClass(SecondarySortParitioner.class);
    //    job.setOutputFormatClass(SequenceFileOutputFormat.class);
        Path transactions=new Path("/Users/mac/Desktop/transactions.txt");
        MultipleInputs.addInputPath(job,transactions,TextInputFormat.class,map1.class);
        MultipleInputs.addInputPath(job,new Path("/Users/mac/Desktop/user.txt"), TextInputFormat.class,map2.class);
        FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort"));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
        
    }
    
}

 

 

SecondarySortGroupComparator:

package com.guigu.left;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import com.book.test1.CompositeKey;

import edu.umd.cloud9.io.pair.PairOfStrings;

/**
 * 不一樣分區的組聚合時,能夠按照咱們要的順序來排列
 * @author Sxq
 *WritableComparator
 */
public class SecondarySortGroupComparator extends WritableComparator {

    public  SecondarySortGroupComparator() {
         super(PairOfStrings.class,true);
    }
    


    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        
        PairOfStrings v1=(PairOfStrings)a;
        PairOfStrings v2=(PairOfStrings)b;
        return v1.getLeftElement().compareTo(v2.getLeftElement());
    }




}

SecondarySortParitioner:

package com.guigu.left;

import org.apache.hadoop.mapreduce.Partitioner;

import edu.umd.cloud9.io.pair.PairOfStrings;
/**
 * 
 * @author Sxq
 *
 */
public class SecondarySortParitioner extends Partitioner<PairOfStrings, Object>{

    @Override
    public int getPartition(PairOfStrings key, Object value, int numPartitions) {
        return (key.getLeftElement().hashCode()&Integer.MAX_VALUE)%numPartitions;
    }

}

 

 

運行結果:

相關文章
相關標籤/搜索