MapReduce中的Join

一. MR中的join的兩種方式:

1.reduce side join(面試題)

reduce side join是一種最簡單的join方式,其主要思想以下:html

在map階段,map函數同時讀取兩個文件File1和File2,爲了區分兩種來源的key/value對,對每條數據打一個標籤(tag),好比:tag=1表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不一樣文件中的數據打標籤,在shuffle階段已經天然按key分組.java

在reduce階段,reduce函數獲取相同k2的v2 list(v2來自File1和File2), 而後對於同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的鏈接操做。node

這種方法有2個問題:面試

1, map階段沒有對數據瘦身,shuffle的網絡傳輸和排序性能很低。

2, reduce端對2個集合作乘積計算,很耗內存,容易致使OOM。

我關於reduce side join的博文總結地址:http://www.cnblogs.com/DreamDrive/p/7692042.html數據庫

2.map side join(面試題)

之因此存在reduce side join,是由於在map階段不能獲取全部須要的join字段,即:同一個key對應的字段可能位於不一樣map中。Reduce side join是很是低效的,由於shuffle階段要進行大量的數據傳輸。apache

Map side join是針對如下場景進行的優化:緩存

兩個待鏈接表中,有一個表很是大,而另外一個表很是小,以致於小表能夠直接存放到內存中。這樣,咱們能夠將小表複製多份,讓每一個map task內存中存在一份(好比存放到hash table中),服務器

而後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,若是有,則鏈接後輸出便可。網絡

 

爲了支持文件的複製,Hadoop提供了一個類DistributedCache,使用該類的方法以下:app

(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要複製的文件,它的參數是文件的URI(若是是HDFS上的文件,能夠這樣:hdfs://namenode:9000/home/XXX/file,其中9000是本身配置的NameNode端口號)。Job在做業啓動以前會獲取這個URI列表,並將相應的文件拷貝到各個Container的本地磁盤上。

(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。

 這種方法的侷限性:

這種方法,要使用hadoop中的DistributedCache把小數據分佈到各個計算節點,每一個map節點都要把小數據庫加載到內存,按關鍵字創建索引。
這種方法有明顯的侷限性:有一份數據比較小,在map端,可以把它加載到內存,並進行join操做。

 

3.針對Map Side Join 侷限的解決方法:

①使用內存服務器,擴大節點的內存空間

針對map join,能夠把一份數據存放到專門的內存服務器,在map()方法中,對每個<key,value>的輸入對,根據key到內存服務器中取出數據,進行鏈接

②使用BloomFilter過濾空鏈接的數據

對其中一份數據在內存中創建BloomFilter,另一份數據在鏈接以前,用BloomFilter判斷它的key是否存在,若是不存在,那這個記錄是空鏈接,能夠忽略。

③使用mapreduce專爲join設計的包

在mapreduce包裏看到有專門爲join設計的包,對這些包尚未學習,不知道怎麼使用,只是在這裏記錄下來,做個提醒。

jar: mapreduce-client-core.jar

package: org.apache.hadoop.mapreduce.lib.join

 4.具體Map Side Join的使用

有客戶數據customer和訂單數據orders。

customer

客戶編號 姓名 地址 電話
1 hanmeimei ShangHai 110
2 leilei BeiJing 112
3 lucy GuangZhou 119

** order**

訂單編號 客戶編號 其它字段被忽略
1 1 50
2 1 200
3 3 15
4 3 350
5 3 58
6 1 42
7 1 352
8 2 1135
9 2 400
10 2 2000
11 2 300

要求對customer和orders按照客戶編號進行鏈接,結果要求對客戶編號分組,對訂單編號排序,對其它字段不做要求

客戶編號 訂單編號 訂單金額 姓名 地址 電話
1 1 50 hanmeimei ShangHai 110
1 2 200 hanmeimei ShangHai 110
1 6 42 hanmeimei ShangHai 110
1 7 352 hanmeimei ShangHai 110
2 8 1135 leilei BeiJing 112
2 9 400 leilei BeiJing 112
2 10 2000 leilei BeiJing 112
2 11 300 leilei BeiJing 112
3 3 15 lucy GuangZhou 119
3 4 350 lucy GuangZhou 119
3 5 58 lucy GuangZhou 119
  1. 在提交job的時候,把小數據經過DistributedCache分發到各個節點。
  2. map端使用DistributedCache讀到數據,在內存中構建映射關係--若是使用專門的內存服務器,就把數據加載到內存服務器,map()節點能夠只保留一份小緩存;若是使用BloomFilter來加速,在這裏就能夠構建;
  3. map()函數中,對每一對<key,value>,根據key到第2)步構建的映射裏面中找出數據,進行鏈接,輸出。

上代碼:

  1 public class MapSideJoin extends Configured implements Tool {
  2     // customer文件在hdfs上的位置。
  3     private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
  4     //客戶數據表對應的實體類
  5     private static class CustomerBean {
  6         private int custId;
  7         private String name;
  8         private String address;
  9         private String phone;
 10         
 11         public CustomerBean() {
 12         }
 13         
 14         public CustomerBean(int custId, String name, String address,String phone) {
 15             super();
 16             this.custId = custId;
 17             this.name = name;
 18             this.address = address;
 19             this.phone = phone;
 20         }
 21         
 22         public int getCustId() {
 23             return custId;
 24         }
 25 
 26         public String getName() {
 27             return name;
 28         }
 29 
 30         public String getAddress() {
 31             return address;
 32         }
 33 
 34         public String getPhone() {
 35             return phone;
 36         }
 37     }
 38     //客戶訂單對應的實體類
 39     private static class CustOrderMapOutKey implements WritableComparable<CustOrderMapOutKey> {
 40         private int custId;
 41         private int orderId;
 42 
 43         public void set(int custId, int orderId) {
 44             this.custId = custId;
 45             this.orderId = orderId;
 46         }
 47         
 48         public int getCustId() {
 49             return custId;
 50         }
 51         
 52         public int getOrderId() {
 53             return orderId;
 54         }
 55         
 56         @Override
 57         public void write(DataOutput out) throws IOException {
 58             out.writeInt(custId);
 59             out.writeInt(orderId);
 60         }
 61 
 62         @Override
 63         public void readFields(DataInput in) throws IOException {
 64             custId = in.readInt();
 65             orderId = in.readInt();
 66         }
 67 
 68         @Override
 69         public int compareTo(CustOrderMapOutKey o) {
 70             int res = Integer.compare(custId, o.custId);
 71             return res == 0 ? Integer.compare(orderId, o.orderId) : res;
 72         }
 73         
 74         @Override
 75         public boolean equals(Object obj) {
 76             if (obj instanceof CustOrderMapOutKey) {
 77                 CustOrderMapOutKey o = (CustOrderMapOutKey)obj;
 78                 return custId == o.custId && orderId == o.orderId;
 79             } else {
 80                 return false;
 81             }
 82         }
 83         
 84         @Override
 85         public String toString() {
 86             return custId + "\t" + orderId;
 87         }
 88     }
 89     
 90     private static class JoinMapper extends Mapper<LongWritable, Text, CustOrderMapOutKey, Text> {
 91         private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey();
 92         private final Text outputValue = new Text();
 93         /**
 94          * 把表中每一行的客戶信息封裝成一個Map,存儲在內存中
 95          * Map的key是客戶的id,value是封裝的客戶bean對象
 96          */
 97         private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, Join.CustomerBean>();
 98         @Override
 99         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
100             // 格式: 訂單編   客戶編號    訂單金額
101             String[] cols = value.toString().split("\t");           
102             if (cols.length < 3) {
103                 return;
104             }
105             
106             int custId = Integer.parseInt(cols[1]);// 取出客戶編號
107             CustomerBean customerBean = CUSTOMER_MAP.get(custId);
108             
109             if (customerBean == null) {// 沒有對應的customer信息能夠鏈接
110                 return;
111             }
112             
113             StringBuffer sb = new StringBuffer();
114             sb.append(cols[2]).append("\t")
115                 .append(customerBean.getName()).append("\t")
116                 .append(customerBean.getAddress()).append("\t")
117                 .append(customerBean.getPhone());
118             outputValue.set(sb.toString());
119             outputKey.set(custId, Integer.parseInt(cols[0]));
120             context.write(outputKey, outputValue);
121         }
122         
123         //在Mapper方法執行前執行
124         @Override
125         protected void setup(Context context) throws IOException, InterruptedException {
126             FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());
127             FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));
128             
129             BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));
130             String line = null;
131             String[] cols = null;
132             
133             // 格式:客戶編號  姓名  地址  電話
134             while ((line = reader.readLine()) != null) {
135                 cols = line.split("\t");
136                 if (cols.length < 4) {// 數據格式不匹配,忽略
137                     continue;
138                 }
139                 CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);
140                 CUSTOMER_MAP.put(bean.getCustId(), bean);
141             }
142         }
143     }
144 
145     /**
146      * reduce
147      */
148     private static class JoinReducer extends Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> {
149         @Override
150         protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
151             // 什麼事都不用作,直接輸出
152             for (Text value : values) {
153                 context.write(key, value);
154             }
155         }
156     }
157     /**
158      * @param args
159      * @throws Exception
160      */
161     public static void main(String[] args) throws Exception {
162         if (args.length < 2) {
163             new IllegalArgumentException("Usage: <inpath> <outpath>");
164             return;
165         }
166         ToolRunner.run(new Configuration(), new Join(), args);
167     }
168 
169     @Override
170     public int run(String[] args) throws Exception {
171         Configuration conf = getConf();
172         Job job = Job.getInstance(conf, Join.class.getSimpleName());
173         job.setJarByClass(SecondarySortMapReduce.class);
174         
175         // 添加customer cache文件
176         job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));
177         
178         FileInputFormat.addInputPath(job, new Path(args[0]));
179         FileOutputFormat.setOutputPath(job, new Path(args[1]));
180         
181         // map settings
182         job.setMapperClass(JoinMapper.class);
183         job.setMapOutputKeyClass(CustOrderMapOutKey.class);
184         job.setMapOutputValueClass(Text.class);
185         
186         // reduce settings
187         job.setReducerClass(JoinReducer.class);
188         job.setOutputKeyClass(CustOrderMapOutKey.class);
189         job.setOutputKeyClass(Text.class);
190         
191         boolean res = job.waitForCompletion(true);
192         return res ? 0 : 1;
193     }
194 }

上面的代碼沒有使用DistributedCache類:

5.Map Side Join的再一個例子:

  1 import java.io.BufferedReader;   
  2 import java.io.FileReader;   
  3 import java.io.IOException;   
  4 import java.util.HashMap;   
  5 import org.apache.hadoop.conf.Configuration;   
  6 import org.apache.hadoop.conf.Configured;   
  7 import org.apache.hadoop.filecache.DistributedCache;   
  8 import org.apache.hadoop.fs.Path;   
  9 import org.apache.hadoop.io.Text;   
 10 import org.apache.hadoop.mapreduce.Job;   
 11 import org.apache.hadoop.mapreduce.Mapper;   
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
 16 import org.apache.hadoop.util.Tool;   
 17 import org.apache.hadoop.util.ToolRunner;   
 18 import org.slf4j.Logger;   
 19 import org.slf4j.LoggerFactory;   
 20 /**   
 21  * 用途說明:   
 22  * Map side join中的left outer join   
 23  * 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段   
 24  * table1(左表):tb_dim_city
 25  * (id int,name string,orderid int,city_code int,is_show int),   
 26  * 假設tb_dim_city文件記錄數不多
 27  * tb_dim_city.dat文件內容,分隔符爲"|":   
 28  * id     name  orderid  city_code  is_show   
 29  * 0       其餘        9999     9999         0   
 30  * 1       長春        1        901          1   
 31  * 2       吉林        2        902          1   
 32  * 3       四平        3        903          1   
 33  * 4       松原        4        904          1   
 34  * 5       通化        5        905          1   
 35  * 6       遼源        6        906          1   
 36  * 7       白城        7        907          1   
 37  * 8       白山        8        908          1   
 38  * 9       延吉        9        909          1   
 39  * -------------------------風騷的分割線-------------------------------   
 40  * table2(右表):tb_user_profiles
 41  * (userID int,userName string,network string,flow double,cityID int)   
 42  * tb_user_profiles.dat文件內容,分隔符爲"|":   
 43  * userID   network     flow    cityID   
 44  * 1           2G       123      1   
 45  * 2           3G       333      2   
 46  * 3           3G       555      1   
 47  * 4           2G       777      3   
 48  * 5           3G       666      4   
 49  * ..................................
 50  * ..................................
 51  * -------------------------風騷的分割線-------------------------------   
 52  *  結果:   
 53  *  1   長春  1   901 1   1   2G  123   
 54  *  1   長春  1   901 1   3   3G  555   
 55  *  2   吉林  2   902 1   2   3G  333   
 56  *  3   四平  3   903 1   4   2G  777   
 57  *  4   松原  4   904 1   5   3G  666   
 58  */ 
 59 public class MapSideJoinMain extends Configured implements Tool{   
 60     private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
 61     
 62     public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {
 63         private HashMap<String,String> city_infoMap = new HashMap<String, String>();   
 64         private Text outPutKey = new Text();   
 65         private Text outPutValue = new Text();   
 66         private String mapInputStr = null;   
 67         private String mapInputSpit[] = null;   
 68         private String city_secondPart = null;   
 69         /**   
 70          * 此方法在每一個task開始以前執行,這裏主要用做從DistributedCache   
 71          * 中取到tb_dim_city文件,並將裏邊記錄取出放到內存中。   
 72          */ 
 73         @Override 
 74         protected void setup(Context context) throws IOException, InterruptedException {   
 75             BufferedReader br = null;   
 76             //得到當前做業的DistributedCache相關文件   
 77             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
 78             String cityInfo = null;   
 79             for(Path p : distributePaths){   
 80                 if(p.toString().endsWith("tb_dim_city.dat")){   
 81                     //讀緩存文件,並放到mem中   
 82                     br = new BufferedReader(new FileReader(p.toString()));   
 83                     while(null!=(cityInfo=br.readLine())){   
 84                         String[] cityPart = cityInfo.split("\\|",5);   
 85                         if(cityPart.length ==5){   
 86                             city_infoMap.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   
 87                         }   
 88                     }   
 89                 }   
 90             }   
 91         }
 92  
 93         /**   
 94          * Map端的實現至關簡單,直接判斷tb_user_profiles.dat中的   
 95          * cityID是否存在個人map中就ok了,這樣就能夠實現Map Join了   
 96          */ 
 97         @Override 
 98         protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {   
 99             //排掉空行   
100             if(value == null || value.toString().equals("")){   
101                 return;   
102             }   
103             mapInputStr = value.toString();   
104             mapInputSpit = mapInputStr.split("\\|",4);   
105             //過濾非法記錄   
106             if(mapInputSpit.length != 4){   
107                 return;   
108             }   
109             //判斷連接字段是否在map中存在   
110             city_secondPart = city_infoMap.get(mapInputSpit[3]);   
111             if(city_secondPart != null){   
112                 this.outPutKey.set(mapInputSpit[3]);   
113                 this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   
114                 context.write(outPutKey, outPutValue);   
115             }   
116         }   
117     }   
118     @Override 
119     public int run(String[] args) throws Exception {   
120             Configuration conf=getConf(); //得到配置文件對象   
121             DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//爲該job添加緩存文件   
122             Job job=new Job(conf,"MapJoinMR");   
123             job.setNumReduceTasks(0);
124  
125             FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑   
126             FileOutputFormat.setOutputPath(job, new Path(args[2])); //設置reduce輸出文件路徑
127  
128             job.setJarByClass(MapSideJoinMain.class);   
129             job.setMapperClass(LeftOutJoinMapper.class);
130  
131             job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式   
132             job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式
133  
134             //設置map的輸出key和value類型   
135             job.setMapOutputKeyClass(Text.class);
136  
137             //設置reduce的輸出key和value類型   
138             job.setOutputKeyClass(Text.class);   
139             job.setOutputValueClass(Text.class);   
140             job.waitForCompletion(true);   
141             return job.isSuccessful()?0:1;   
142     }   
143     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {   
144         try {   
145             int returnCode = ToolRunner.run(new MapSideJoinMain(),args);   
146             System.exit(returnCode);   
147         } catch (Exception e) {   
148             logger.error(e.getMessage());   
149         }   
150     }   
151 } 

 

 6.SemiJoin

 SemiJoin就是所謂的半鏈接,其實仔細一看就是reduce join的一個變種,就是在map端過濾掉一些數據,在網絡中只傳輸參與鏈接的數據不參與鏈接的數據沒必要在網絡中進行傳輸,從而減小了shuffle的網絡傳輸量,使總體效率獲得提升,其餘思想和reduce join是如出一轍的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來經過DistributedCach分發到相關節點,而後將其取出放到內存中(能夠放到HashSet中),在map階段掃描鏈接表,將join key不在內存HashSet中的記錄過濾掉,讓那些參與join的記錄經過shuffle傳輸到reduce端進行join操做,其餘的和reduce join都是同樣的。看代碼:

  1 import java.io.BufferedReader;   
  2 import java.io.FileReader;   
  3 import java.io.IOException;   
  4 import java.util.ArrayList;   
  5 import java.util.HashSet;   
  6 import org.apache.hadoop.conf.Configuration;   
  7 import org.apache.hadoop.conf.Configured;   
  8 import org.apache.hadoop.filecache.DistributedCache;   
  9 import org.apache.hadoop.fs.Path;   
 10 import org.apache.hadoop.io.Text;   
 11 import org.apache.hadoop.mapreduce.Job;   
 12 import org.apache.hadoop.mapreduce.Mapper;   
 13 import org.apache.hadoop.mapreduce.Reducer;   
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
 15 import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
 19 import org.apache.hadoop.util.Tool;   
 20 import org.apache.hadoop.util.ToolRunner;   
 21 import org.slf4j.Logger;   
 22 import org.slf4j.LoggerFactory;   
 23 /**   
 24  * @author zengzhaozheng   
 25  *   
 26  * 用途說明:   
 27  * reudce side join中的left outer join   
 28  * 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段   
 29  * table1(左表):tb_dim_city
 30  * (id int,name string,orderid int,city_code,is_show)   
 31  * tb_dim_city.dat文件內容,分隔符爲"|":   
 32  * id     name  orderid  city_code  is_show   
 33  * 0       其餘        9999     9999         0   
 34  * 1       長春        1        901          1   
 35  * 2       吉林        2        902          1   
 36  * 3       四平        3        903          1   
 37  * 4       松原        4        904          1   
 38  * 5       通化        5        905          1   
 39  * 6       遼源        6        906          1   
 40  * 7       白城        7        907          1   
 41  * 8       白山        8        908          1   
 42  * 9       延吉        9        909          1   
 43  * -------------------------風騷的分割線-------------------------------   
 44  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
 45  * tb_user_profiles.dat文件內容,分隔符爲"|":   
 46  * userID   network     flow    cityID   
 47  * 1           2G       123      1   
 48  * 2           3G       333      2   
 49  * 3           3G       555      1   
 50  * 4           2G       777      3   
 51  * 5           3G       666      4   
 52  * ..................................
 53  * ..................................
 54  * -------------------------風騷的分割線-------------------------------   
 55  * joinKey.dat內容:   
 56  * city_code   
 57  * 1   
 58  * 2   
 59  * 3   
 60  * 4   
 61  * -------------------------風騷的分割線-------------------------------   
 62  *  結果:   
 63  *  1   長春  1   901 1   1   2G  123   
 64  *  1   長春  1   901 1   3   3G  555   
 65  *  2   吉林  2   902 1   2   3G  333   
 66  *  3   四平  3   903 1   4   2G  777   
 67  *  4   松原  4   904 1   5   3G  666   
 68  */ 
 69 public class SemiJoin extends Configured implements Tool{   
 70     private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
 71     public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
 72         private CombineValues combineValues = new CombineValues();   
 73         private HashSet<String> joinKeySet = new HashSet<String>();   
 74         private Text flag = new Text();   
 75         private Text joinKey = new Text();   
 76         private Text secondPart = new Text();   
 77         /**   
 78          * 將參加join的key從DistributedCache取出放到內存中,以便在map端將要參加join的key過濾出來。b   
 79          */ 
 80         @Override 
 81         protected void setup(Context context) throws IOException, InterruptedException {   
 82             BufferedReader br = null;   
 83             //得到當前做業的DistributedCache相關文件   
 84             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
 85             String joinKeyStr = null;   
 86             for(Path p : distributePaths){   
 87                 if(p.toString().endsWith("joinKey.dat")){   
 88                     //讀緩存文件,並放到mem中   
 89                     br = new BufferedReader(new FileReader(p.toString()));   
 90                     while(null!=(joinKeyStr=br.readLine())){   
 91                         joinKeySet.add(joinKeyStr);   
 92                     }   
 93                 }   
 94             }   
 95         }   
 96         @Override 
 97         protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {   
 98             //得到文件輸入路徑   
 99             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
100             //數據來自tb_dim_city.dat文件,標誌即爲"0"   
101             if(pathName.endsWith("tb_dim_city.dat")){   
102                 String[] valueItems = value.toString().split("\\|");   
103                 //過濾格式錯誤的記錄   
104                 if(valueItems.length != 5){   
105                     return;   
106                 }   
107                 //過濾掉不須要參加join的記錄   
108                 if(joinKeySet.contains(valueItems[0])){   
109                     flag.set("0");   
110                     joinKey.set(valueItems[0]);   
111                     secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
112                     combineValues.setFlag(flag);   
113                     combineValues.setJoinKey(joinKey);   
114                     combineValues.setSecondPart(secondPart);   
115                     context.write(combineValues.getJoinKey(), combineValues);   
116                 }else{   
117                     return ;   
118                 }   
119             }//數據來自於tb_user_profiles.dat,標誌即爲"1"   
120             else if(pathName.endsWith("tb_user_profiles.dat")){   
121                 String[] valueItems = value.toString().split("\\|");   
122                 //過濾格式錯誤的記錄   
123                 if(valueItems.length != 4){   
124                     return;   
125                 }   
126                 //過濾掉不須要參加join的記錄   
127                 if(joinKeySet.contains(valueItems[3])){   
128                     flag.set("1");   
129                     joinKey.set(valueItems[3]);   
130                     secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
131                     combineValues.setFlag(flag);   
132                     combineValues.setJoinKey(joinKey);   
133                     combineValues.setSecondPart(secondPart);   
134                     context.write(combineValues.getJoinKey(), combineValues);   
135                 }else{   
136                     return ;   
137                 }   
138             }   
139         }   
140     }   
141     public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
142         //存儲一個分組中的左表信息   
143         private ArrayList<Text> leftTable = new ArrayList<Text>();   
144         //存儲一個分組中的右表信息   
145         private ArrayList<Text> rightTable = new ArrayList<Text>();   
146         private Text secondPar = null;   
147         private Text output = new Text();   
148         /**   
149          * 一個分組調用一次reduce函數   
150          */ 
151         @Override 
152         protected void reduce(Text key, Iterable<CombineValues> value, Context context) throws IOException, InterruptedException {   
153             leftTable.clear();   
154             rightTable.clear();   
155             /**   
156              * 將分組中的元素按照文件分別進行存放   
157              * 這種方法要注意的問題:   
158              * 若是一個分組內的元素太多的話,可能會致使在reduce階段出現OOM,   
159              * 在處理分佈式問題以前最好先了解數據的分佈狀況,根據不一樣的分佈採起最   
160              * 適當的處理方法,這樣能夠有效的防止致使OOM和數據過分傾斜問題。   
161              */ 
162             for(CombineValues cv : value){   
163                 secondPar = new Text(cv.getSecondPart().toString());   
164                 //左表tb_dim_city   
165                 if("0".equals(cv.getFlag().toString().trim())){   
166                     leftTable.add(secondPar);   
167                 }   
168                 //右表tb_user_profiles   
169                 else if("1".equals(cv.getFlag().toString().trim())){   
170                     rightTable.add(secondPar);   
171                 }   
172             }   
173             logger.info("tb_dim_city:"+leftTable.toString());   
174             logger.info("tb_user_profiles:"+rightTable.toString());   
175             for(Text leftPart : leftTable){   
176                 for(Text rightPart : rightTable){   
177                     output.set(leftPart+ "\t" + rightPart);   
178                     context.write(key, output);   
179                 }   
180             }   
181         }   
182     }   
183     @Override 
184     public int run(String[] args) throws Exception {   
185             Configuration conf=getConf(); //得到配置文件對象   
186             DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
187             Job job=new Job(conf,"LeftOutJoinMR");   
188             job.setJarByClass(SemiJoin.class);
189  
190             FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑   
191             FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
192  
193             job.setMapperClass(SemiJoinMapper.class);   
194             job.setReducerClass(SemiJoinReducer.class);
195  
196             job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式   
197             job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式
198  
199             //設置map的輸出key和value類型   
200             job.setMapOutputKeyClass(Text.class);   
201             job.setMapOutputValueClass(CombineValues.class);
202  
203             //設置reduce的輸出key和value類型   
204             job.setOutputKeyClass(Text.class);   
205             job.setOutputValueClass(Text.class);   
206             job.waitForCompletion(true);   
207             return job.isSuccessful()?0:1;   
208     }   
209     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {   
210         try {   
211             int returnCode =  ToolRunner.run(new SemiJoin(),args);   
212             System.exit(returnCode);   
213         } catch (Exception e) {   
214             logger.error(e.getMessage());   
215         }   
216     }   
217 } 

 

這裏還說說SemiJoin也是有必定的適用範圍的,其抽取出來進行join的key是要放到內存中的,因此不可以太大,容易在Map端形成OOM。

2、總結

blog介紹了三種join方式。這三種join方式適用於不一樣的場景,其處理效率上的相差仍是蠻大的,其中主要致使因素是網絡傳輸。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,寫分佈式大數據處理程序的時最好要對總體要處理的數據分佈狀況做一個瞭解,這能夠提升咱們代碼的效率,使數據的傾斜度降到最低,使咱們的代碼傾向性更好。

相關文章
相關標籤/搜索