Hadoop中MapReduce多種join實現實例分析

1、概述 java

對於RDBMS中的join操做大夥必定很是熟悉,寫sql的時候要十分注意細節,稍有差池就會耗時巨久形成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行join的操做時一樣耗時,可是因爲hadoop的分佈式設計理念的特殊性,所以對於這種join操做一樣也具有了必定的特殊性。本文主要對MapReduce框架對錶之間的join操做的幾種實現方式進行詳細分析,而且根據我在實際開發過程當中遇到的實際例子來進行進一步的說明。 sql

2、實現原理 apache

一、在Reudce端進行鏈接。 緩存

在Reudce端進行鏈接是MapReduce框架進行表之間join操做最爲常見的模式,其具體的實現原理以下: 網絡

Map端的主要工做:爲來自不一樣表(文件)的key/value對打標籤以區別不一樣來源的記錄。而後用鏈接字段做爲key,其他部分和新加的標誌做爲value,最後進行輸出。 app

reduce端的主要工做:在reduce端以鏈接字段做爲key的分組已經完成,咱們只須要在每個分組當中將那些來源於不一樣文件的記錄(在map階段已經打標誌)分開,最後進行笛卡爾只就ok了。原理很是簡單,下面來看一個實例: 框架

(1)自定義一個value返回類型: 分佈式

package com.mr.reduceSizeJoin;   
import java.io.DataInput;   
import java.io.DataOutput;   
import java.io.IOException;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.io.WritableComparable;   
public class CombineValues implements WritableComparable<CombineValues>{   
    //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);   
    private Text joinKey;//連接關鍵字   
    private Text flag;//文件來源標誌   
    private Text secondPart;//除了連接鍵外的其餘部分   
    public void setJoinKey(Text joinKey) {   
        this.joinKey = joinKey;   
    }   
    public void setFlag(Text flag) {   
        this.flag = flag;   
    }   
    public void setSecondPart(Text secondPart) {   
        this.secondPart = secondPart;   
    }   
    public Text getFlag() {   
        return flag;   
    }   
    public Text getSecondPart() {   
        return secondPart;   
    }   
    public Text getJoinKey() {   
        return joinKey;   
    }   
    public CombineValues() {   
        this.joinKey =  new Text();   
        this.flag = new Text();   
        this.secondPart = new Text();   
    }
 
    @Override 
    public void write(DataOutput out) throws IOException {   
        this.joinKey.write(out);   
        this.flag.write(out);   
        this.secondPart.write(out);   
    }   
    @Override 
    public void readFields(DataInput in) throws IOException {   
        this.joinKey.readFields(in);   
        this.flag.readFields(in);   
        this.secondPart.readFields(in);   
    }   
    @Override 
    public int compareTo(CombineValues o) {   
        return this.joinKey.compareTo(o.getJoinKey());   
    }   
    @Override 
    public String toString() {   
        // TODO Auto-generated method stub   
        return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";   
    }   
}

(2)map、reduce主體代碼 ide

package com.mr.reduceSizeJoin;   
import java.io.IOException;   
import java.util.ArrayList;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.Reducer;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/**   
 * @author zengzhaozheng   
 * 用途說明:   
 * reudce side join中的left outer join   
 * 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段   
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
 * tb_dim_city.dat文件內容,分隔符爲"|":   
 * id     name  orderid  city_code  is_show   
 * 0       其餘        9999     9999         0   
 * 1       長春        1        901          1   
 * 2       吉林        2        902          1   
 * 3       四平        3        903          1   
 * 4       松原        4        904          1   
 * 5       通化        5        905          1   
 * 6       遼源        6        906          1   
 * 7       白城        7        907          1   
 * 8       白山        8        908          1   
 * 9       延吉        9        909          1   
 * -------------------------風騷的分割線-------------------------------   
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
 * tb_user_profiles.dat文件內容,分隔符爲"|":   
 * userID   network     flow    cityID   
 * 1           2G       123      1   
 * 2           3G       333      2   
 * 3           3G       555      1   
 * 4           2G       777      3   
 * 5           3G       666      4   
 *   
 * -------------------------風騷的分割線-------------------------------   
 *  結果:   
 *  1   長春  1   901 1   1   2G  123   
 *  1   長春  1   901 1   3   3G  555   
 *  2   吉林  2   902 1   2   3G  333   
 *  3   四平  3   903 1   4   2G  777   
 *  4   松原  4   904 1   5   3G  666   
 */ 
public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);   
    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
        private CombineValues combineValues = new CombineValues();   
        private Text flag = new Text();   
        private Text joinKey = new Text();   
        private Text secondPart = new Text();   
        @Override 
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //得到文件輸入路徑   
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
            //數據來自tb_dim_city.dat文件,標誌即爲"0"   
            if(pathName.endsWith("tb_dim_city.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //過濾格式錯誤的記錄   
                if(valueItems.length != 5){   
                    return;   
                }   
                flag.set("0");   
                joinKey.set(valueItems[0]);   
                secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
                combineValues.setFlag(flag);   
                combineValues.setJoinKey(joinKey);   
                combineValues.setSecondPart(secondPart);   
                context.write(combineValues.getJoinKey(), combineValues);
 
                }//數據來自於tb_user_profiles.dat,標誌即爲"1"   
            else if(pathName.endsWith("tb_user_profiles.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //過濾格式錯誤的記錄   
                if(valueItems.length != 4){   
                    return;   
                }   
                flag.set("1");   
                joinKey.set(valueItems[3]);   
                secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
                combineValues.setFlag(flag);   
                combineValues.setJoinKey(joinKey);   
                combineValues.setSecondPart(secondPart);   
                context.write(combineValues.getJoinKey(), combineValues);   
            }   
        }   
    }   
    public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
        //存儲一個分組中的左表信息   
        private ArrayList<Text> leftTable = new ArrayList<Text>();   
        //存儲一個分組中的右表信息   
        private ArrayList<Text> rightTable = new ArrayList<Text>();   
        private Text secondPar = null;   
        private Text output = new Text();   
        /**   
         * 一個分組調用一次reduce函數   
         */ 
        @Override 
        protected void reduce(Text key, Iterable<CombineValues> value, Context context)   
                throws IOException, InterruptedException {   
            leftTable.clear();   
            rightTable.clear();   
            /**   
             * 將分組中的元素按照文件分別進行存放   
             * 這種方法要注意的問題:   
             * 若是一個分組內的元素太多的話,可能會致使在reduce階段出現OOM,   
             * 在處理分佈式問題以前最好先了解數據的分佈狀況,根據不一樣的分佈採起最   
             * 適當的處理方法,這樣能夠有效的防止致使OOM和數據過分傾斜問題。   
             */ 
            for(CombineValues cv : value){   
                secondPar = new Text(cv.getSecondPart().toString());   
                //左表tb_dim_city   
                if("0".equals(cv.getFlag().toString().trim())){   
                    leftTable.add(secondPar);   
                }   
                //右表tb_user_profiles   
                else if("1".equals(cv.getFlag().toString().trim())){   
                    rightTable.add(secondPar);   
                }   
            }   
            logger.info("tb_dim_city:"+leftTable.toString());   
            logger.info("tb_user_profiles:"+rightTable.toString());   
            for(Text leftPart : leftTable){   
                for(Text rightPart : rightTable){   
                    output.set(leftPart+ "\t" + rightPart);   
                    context.write(key, output);   
                }   
            }   
        }   
    }   
    @Override 
    public int run(String[] args) throws Exception {   
          Configuration conf=getConf(); //得到配置文件對象   
            Job job=new Job(conf,"LeftOutJoinMR");   
            job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
            FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑   
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
            job.setMapperClass(LeftOutJoinMapper.class);   
            job.setReducerClass(LeftOutJoinReducer.class);
            job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式   
            job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格格式
 
            //設置map的輸出key和value類型   
            job.setMapOutputKeyClass(Text.class);   
            job.setMapOutputValueClass(CombineValues.class);
 
            //設置reduce的輸出key和value類型   
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            // TODO Auto-generated catch block   
            logger.error(e.getMessage());   
        }   
    }   
}



 

其中具體的分析以及數據的輸出輸入請看代碼中的註釋已經寫得比較清楚了,這裏主要分析一下reduce join的一些不足。之因此會存在reduce join這種方式,咱們能夠很明顯的看出原:由於總體數據被分割了,每一個map task只處理一部分數據而不可以獲取到全部須要的join字段,所以咱們須要在講join key做爲reduce端的分組將全部join key相同的記錄集中起來進行處理,因此reduce join這種方式就出現了。這種方式的缺點很明顯就是會形成map和reduce端也就是shuffle階段出現大量的數據傳輸,效率很低。 函數

二、在Map端進行鏈接。

使用場景:一張表十分小、一張表很大。

用法:在提交做業的時候先將小表文件放到該做業的DistributedCache中,而後從DistributeCache中取出該小表進行join key / value解釋分割放到內存中(能夠放大Hash Map等等容器中)。而後掃描大表,看大表中的每條記錄的join key /value值是否可以在內存中找到相同join key的記錄,若是有則直接輸出結果。

直接上代碼,比較簡單:

package com.mr.mapSideJoin;   
import java.io.BufferedReader;   
import java.io.FileReader;   
import java.io.IOException;   
import java.util.HashMap;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.filecache.DistributedCache;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/**   
 * @author zengzhaozheng   
 *   
 * 用途說明:   
 * Map side join中的left outer join   
 * 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段   
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),   
 * 假設tb_dim_city文件記錄數不多,tb_dim_city.dat文件內容,分隔符爲"|":   
 * id     name  orderid  city_code  is_show   
 * 0       其餘        9999     9999         0   
 * 1       長春        1        901          1   
 * 2       吉林        2        902          1   
 * 3       四平        3        903          1   
 * 4       松原        4        904          1   
 * 5       通化        5        905          1   
 * 6       遼源        6        906          1   
 * 7       白城        7        907          1   
 * 8       白山        8        908          1   
 * 9       延吉        9        909          1   
 * -------------------------風騷的分割線-------------------------------   
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
 * tb_user_profiles.dat文件內容,分隔符爲"|":   
 * userID   network     flow    cityID   
 * 1           2G       123      1   
 * 2           3G       333      2   
 * 3           3G       555      1   
 * 4           2G       777      3   
 * 5           3G       666      4   
 * -------------------------風騷的分割線-------------------------------   
 *  結果:   
 *  1   長春  1   901 1   1   2G  123   
 *  1   長春  1   901 1   3   3G  555   
 *  2   吉林  2   902 1   2   3G  333   
 *  3   四平  3   903 1   4   2G  777   
 *  4   松原  4   904 1   5   3G  666   
 */ 
public class MapSideJoinMain extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   
    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {
 
        private HashMap<String,String> city_info = new HashMap<String, String>();   
        private Text outPutKey = new Text();   
        private Text outPutValue = new Text();   
        private String mapInputStr = null;   
        private String mapInputSpit[] = null;   
        private String city_secondPart = null;   
        /**   
         * 此方法在每一個task開始以前執行,這裏主要用做從DistributedCache   
         * 中取到tb_dim_city文件,並將裏邊記錄取出放到內存中。   
         */ 
        @Override 
        protected void setup(Context context)   
                throws IOException, InterruptedException {   
            BufferedReader br = null;   
            //得到當前做業的DistributedCache相關文件   
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
            String cityInfo = null;   
            for(Path p : distributePaths){   
                if(p.toString().endsWith("tb_dim_city.dat")){   
                    //讀緩存文件,並放到mem中   
                    br = new BufferedReader(new FileReader(p.toString()));   
                    while(null!=(cityInfo=br.readLine())){   
                        String[] cityPart = cityInfo.split("\\|",5);   
                        if(cityPart.length ==5){   
                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   
                        }   
                    }   
                }   
            }   
        }
 
        /**   
         * Map端的實現至關簡單,直接判斷tb_user_profiles.dat中的   
         * cityID是否存在個人map中就ok了,這樣就能夠實現Map Join了   
         */ 
        @Override 
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //排掉空行   
            if(value == null || value.toString().equals("")){   
                return;   
            }   
            mapInputStr = value.toString();   
            mapInputSpit = mapInputStr.split("\\|",4);   
            //過濾非法記錄   
            if(mapInputSpit.length != 4){   
                return;   
            }   
            //判斷連接字段是否在map中存在   
            city_secondPart = city_info.get(mapInputSpit[3]);   
            if(city_secondPart != null){   
                this.outPutKey.set(mapInputSpit[3]);   
                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   
                context.write(outPutKey, outPutValue);   
            }   
        }   
    }   
    @Override 
    public int run(String[] args) throws Exception {   
            Configuration conf=getConf(); //得到配置文件對象   
            DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//爲該job添加緩存文件   
            Job job=new Job(conf,"MapJoinMR");   
            job.setNumReduceTasks(0);
 
            FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑   
            FileOutputFormat.setOutputPath(job, new Path(args[2])); //設置reduce輸出文件路徑
 
            job.setJarByClass(MapSideJoinMain.class);   
            job.setMapperClass(LeftOutJoinMapper.class);
 
            job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式   
            job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式
 
            //設置map的輸出key和value類型   
            job.setMapOutputKeyClass(Text.class);
 
            //設置reduce的輸出key和value類型   
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            // TODO Auto-generated catch block   
            logger.error(e.getMessage());   
        }   
    }   
}

這裏說說DistributedCache。DistributedCache是分佈式緩存的一種實現,它在整個MapReduce框架中起着至關重要的做用,他能夠支撐咱們寫一些至關複雜高效的分佈式程序。說回到這裏,JobTracker在做業啓動以前會獲取到DistributedCache的資源uri列表,並將對應的文件分發到各個涉及到該做業的任務的TaskTracker上。另外,關於DistributedCache和做業的關係,好比權限、存儲路徑區分、public和private等屬性,接下來有用再整理研究一下寫一篇blog,這裏就不詳細說了。

另外還有一種比較變態的Map Join方式,就是結合HBase來作Map Join操做。這種方式徹底能夠突破內存的控制,使你毫無忌憚的使用Map Join,並且效率也很是不錯。

三、SemiJoin。

SemiJoin就是所謂的半鏈接,其實仔細一看就是reduce join的一個變種,就是在map端過濾掉一些數據,在網絡中只傳輸參與鏈接的數據不參與鏈接的數據沒必要在網絡中進行傳輸,從而減小了shuffle的網絡傳輸量,使總體效率獲得提升,其餘思想和reduce join是如出一轍的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來經過DistributedCach分發到相關節點,而後將其取出放到內存中(能夠放到HashSet中),在map階段掃描鏈接表,將join key不在內存HashSet中的記錄過濾掉,讓那些參與join的記錄經過shuffle傳輸到reduce端進行join操做,其餘的和reduce join都是同樣的。看代碼:

package com.mr.SemiJoin;   
import java.io.BufferedReader;   
import java.io.FileReader;   
import java.io.IOException;   
import java.util.ArrayList;   
import java.util.HashSet;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.filecache.DistributedCache;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.Reducer;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/**   
 * @author zengzhaozheng   
 *   
 * 用途說明:   
 * reudce side join中的left outer join   
 * 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段   
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
 * tb_dim_city.dat文件內容,分隔符爲"|":   
 * id     name  orderid  city_code  is_show   
 * 0       其餘        9999     9999         0   
 * 1       長春        1        901          1   
 * 2       吉林        2        902          1   
 * 3       四平        3        903          1   
 * 4       松原        4        904          1   
 * 5       通化        5        905          1   
 * 6       遼源        6        906          1   
 * 7       白城        7        907          1   
 * 8       白山        8        908          1   
 * 9       延吉        9        909          1   
 * -------------------------風騷的分割線-------------------------------   
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
 * tb_user_profiles.dat文件內容,分隔符爲"|":   
 * userID   network     flow    cityID   
 * 1           2G       123      1   
 * 2           3G       333      2   
 * 3           3G       555      1   
 * 4           2G       777      3   
 * 5           3G       666      4   
 * -------------------------風騷的分割線-------------------------------   
 * joinKey.dat內容:   
 * city_code   
 * 1   
 * 2   
 * 3   
 * 4   
 * -------------------------風騷的分割線-------------------------------   
 *  結果:   
 *  1   長春  1   901 1   1   2G  123   
 *  1   長春  1   901 1   3   3G  555   
 *  2   吉林  2   902 1   2   3G  333   
 *  3   四平  3   903 1   4   2G  777   
 *  4   松原  4   904 1   5   3G  666   
 */ 
public class SemiJoin extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
    public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
        private CombineValues combineValues = new CombineValues();   
        private HashSet<String> joinKeySet = new HashSet<String>();   
        private Text flag = new Text();   
        private Text joinKey = new Text();   
        private Text secondPart = new Text();   
        /**   
         * 將參加join的key從DistributedCache取出放到內存中,以便在map端將要參加join的key過濾出來。b   
         */ 
        @Override 
        protected void setup(Context context)   
                throws IOException, InterruptedException {   
            BufferedReader br = null;   
            //得到當前做業的DistributedCache相關文件   
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
            String joinKeyStr = null;   
            for(Path p : distributePaths){   
                if(p.toString().endsWith("joinKey.dat")){   
                    //讀緩存文件,並放到mem中   
                    br = new BufferedReader(new FileReader(p.toString()));   
                    while(null!=(joinKeyStr=br.readLine())){   
                        joinKeySet.add(joinKeyStr);   
                    }   
                }   
            }   
        }   
        @Override 
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //得到文件輸入路徑   
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
            //數據來自tb_dim_city.dat文件,標誌即爲"0"   
            if(pathName.endsWith("tb_dim_city.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //過濾格式錯誤的記錄   
                if(valueItems.length != 5){   
                    return;   
                }   
                //過濾掉不須要參加join的記錄   
                if(joinKeySet.contains(valueItems[0])){   
                    flag.set("0");   
                    joinKey.set(valueItems[0]);   
                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
                    combineValues.setFlag(flag);   
                    combineValues.setJoinKey(joinKey);   
                    combineValues.setSecondPart(secondPart);   
                    context.write(combineValues.getJoinKey(), combineValues);   
                }else{   
                    return ;   
                }   
            }//數據來自於tb_user_profiles.dat,標誌即爲"1"   
            else if(pathName.endsWith("tb_user_profiles.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //過濾格式錯誤的記錄   
                if(valueItems.length != 4){   
                    return;   
                }   
                //過濾掉不須要參加join的記錄   
                if(joinKeySet.contains(valueItems[3])){   
                    flag.set("1");   
                    joinKey.set(valueItems[3]);   
                    secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
                    combineValues.setFlag(flag);   
                    combineValues.setJoinKey(joinKey);   
                    combineValues.setSecondPart(secondPart);   
                    context.write(combineValues.getJoinKey(), combineValues);   
                }else{   
                    return ;   
                }   
            }   
        }   
    }   
    public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
        //存儲一個分組中的左表信息   
        private ArrayList<Text> leftTable = new ArrayList<Text>();   
        //存儲一個分組中的右表信息   
        private ArrayList<Text> rightTable = new ArrayList<Text>();   
        private Text secondPar = null;   
        private Text output = new Text();   
        /**   
         * 一個分組調用一次reduce函數   
         */ 
        @Override 
        protected void reduce(Text key, Iterable<CombineValues> value, Context context)   
                throws IOException, InterruptedException {   
            leftTable.clear();   
            rightTable.clear();   
            /**   
             * 將分組中的元素按照文件分別進行存放   
             * 這種方法要注意的問題:   
             * 若是一個分組內的元素太多的話,可能會致使在reduce階段出現OOM,   
             * 在處理分佈式問題以前最好先了解數據的分佈狀況,根據不一樣的分佈採起最   
             * 適當的處理方法,這樣能夠有效的防止致使OOM和數據過分傾斜問題。   
             */ 
            for(CombineValues cv : value){   
                secondPar = new Text(cv.getSecondPart().toString());   
                //左表tb_dim_city   
                if("0".equals(cv.getFlag().toString().trim())){   
                    leftTable.add(secondPar);   
                }   
                //右表tb_user_profiles   
                else if("1".equals(cv.getFlag().toString().trim())){   
                    rightTable.add(secondPar);   
                }   
            }   
            logger.info("tb_dim_city:"+leftTable.toString());   
            logger.info("tb_user_profiles:"+rightTable.toString());   
            for(Text leftPart : leftTable){   
                for(Text rightPart : rightTable){   
                    output.set(leftPart+ "\t" + rightPart);   
                    context.write(key, output);   
                }   
            }   
        }   
    }   
    @Override 
    public int run(String[] args) throws Exception {   
            Configuration conf=getConf(); //得到配置文件對象   
            DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
            Job job=new Job(conf,"LeftOutJoinMR");   
            job.setJarByClass(SemiJoin.class);
 
            FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑   
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
 
            job.setMapperClass(SemiJoinMapper.class);   
            job.setReducerClass(SemiJoinReducer.class);
 
            job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式   
            job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式
 
            //設置map的輸出key和value類型   
            job.setMapOutputKeyClass(Text.class);   
            job.setMapOutputValueClass(CombineValues.class);
 
            //設置reduce的輸出key和value類型   
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new SemiJoin(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            logger.error(e.getMessage());   
        }   
    }   
}

這裏還說說SemiJoin也是有必定的適用範圍的,其抽取出來進行join的key是要放到內存中的,因此不可以太大,容易在Map端形成OOM。

3、總結

blog介紹了三種join方式。這三種join方式適用於不一樣的場景,其處理效率上的相差仍是蠻大的,其中主要致使因素是網絡傳輸。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,寫分佈式大數據處理程序的時最好要對總體要處理的數據分佈狀況做一個瞭解,這能夠提升咱們代碼的效率,使數據的傾斜度降到最低,使咱們的代碼傾向性更好。

相關文章
相關標籤/搜索