MapReduce實現ReduceSideJoin操做

本文轉載於:http://blog.csdn.net/xyilu/article/details/8996204java

一.準備兩張表以及對應的數據

(1)m_ys_lab_jointest_a(如下簡稱表A)

建表語句:web

create table if not exists m_ys_lab_jointest_a (  
     id bigint,  
     name string  
)  
row format delimited  
fields terminated by '9'  
lines terminated by '10'  
stored as textfile; 

具體數據以下:apache

id    name
1 北京 2 天津 3 河北 4 山西 5 內蒙古 6 遼寧 7 吉林 8 黑龍江
 
 
 
 
 
 
 
 
 
 
(2)m_ys_lab_jointest_b(如下簡稱表B)
建表語句爲:
create table if not exists m_ys_lab_jointest_b (  
     id bigint,  
     statyear bigint,  
     num bigint  
)  
row format delimited  
fields terminated by '9'  
lines terminated by '10'  
stored as textfile;  

 具體數據以下:數組

id   statyear  num
1     2010     1962
1     2011     2019
2     2010     1299
2     2011     1355
4     2010     3574
4     2011     3593
9     2010     2303
9     2011     2347

 

 

 

 

 

 

 

咱們的目的是,以id爲key作join操做,獲得如下表:
m_ys_lab_jointest_ab
id     name    statyear     num
1       北京    2011    2019
1       北京    2010    1962
2       天津    2011    1355
2       天津    2010    1299
4       山西    2011    3593
4       山西    2010    3574

 

 

 

 

 

二.計算模型

整個計算過程是:app

(1)在map階段,把全部記錄標記成<key, value>的形式,其中key是id,value則根據來源不一樣取不一樣的形式:來源於表A的記錄,value的值爲"a#"+name;來源於表B的記錄,value的值爲"b#"+score。
(2)在reduce階段,先把每一個key下的value列表拆分爲分別來自表A和表B的兩部分,分別放入兩個向量中。而後遍歷兩個向量作笛卡爾積,造成一條條最終結果。
以下圖所示:

上代碼:oop

 1 import java.io.IOException;
 2 import java.util.ArrayList;
 3 import java.util.Iterator;
 4 import java.util.List;
 5 
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapred.FileSplit;
 9 import org.apache.hadoop.mapred.JobConf;
10 import org.apache.hadoop.mapred.MapReduceBase;
11 import org.apache.hadoop.mapred.Mapper;
12 import org.apache.hadoop.mapred.OutputCollector;
13 import org.apache.hadoop.mapred.Reducer;
14 import org.apache.hadoop.mapred.Reporter;  
15   
16 /** 
17  * MapReduce實現Join操做 
18  */  
19 public class MapRedJoin {  
20     public static final String DELIMITER = "\u0009"; // 字段分隔符  
21       
22     // map過程  
23     public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {  
24         public void configure(JobConf job) {  
25             super.configure(job);  
26         }  
27           
28         public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, ClassCastException {  
29             // 獲取輸入文件的全路徑和名稱  
30             String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();  
31             // 獲取記錄字符串  
32             String line = value.toString();  
33             // 拋棄空記錄  
34             if (line == null || line.equals("")){
35                 return;
36             }   
37             // 處理來自表A的記錄  
38             if (filePath.contains("m_ys_lab_jointest_a")) {  
39                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
40                 if (values.length < 2){
41                     return;
42                 }   
43                 String id = values[0]; // id  
44                 String name = values[1]; // name  
45                 output.collect(new Text(id), new Text("a#"+name));  
46             } else if (filePath.contains("m_ys_lab_jointest_b")) {// 處理來自表B的記錄
47                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
48                 if (values.length < 3){
49                     return;
50                 }  
51                 String id = values[0]; // id  
52                 String statyear = values[1]; // statyear  
53                 String num = values[2]; //num  
54                 output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));  
55             }  
56         }  
57     }  
58       
59     // reduce過程  
60     public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {  
61         public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {  
62             List<String> listA = new ArrayList<String>(); // 存放來自表A的值  
63             List<String> listB = new ArrayList<String>(); // 存放來自表B的值  
64             while (values.hasNext()) {
65                 String value = values.next().toString();  
66                 if (value.startsWith("a#")) {  
67                     listA.add(value.substring(2));  
68                 } else if (value.startsWith("b#")) {  
69                     listB.add(value.substring(2));  
70                 }  
71             }
72             int sizeA = listA.size();  
73             int sizeB = listB.size();  
74             // 遍歷兩個向量  
75             int i, j;  
76             for (i = 0; i < sizeA; i ++) {  
77                 for (j = 0; j < sizeB; j ++) {  
78                     output.collect(key, new Text(listA.get(i) + DELIMITER +listB.get(j)));  
79                 }  
80             }     
81         }  
82     }  
83       
84     protected void configJob(JobConf conf) {  
85         conf.setMapOutputKeyClass(Text.class);  
86         conf.setMapOutputValueClass(Text.class);  
87         conf.setOutputKeyClass(Text.class);  
88         conf.setOutputValueClass(Text.class);  
89         conf.setOutputFormat(ReportOutFormat.class);  
90     }  
91 }  

 三.技術細節

下面說一下其中的若干技術細節:
(1)因爲輸入數據涉及兩張表,咱們須要判斷當前處理的記錄是來自表A仍是來自表B。Reporter類getInputSplit()方法能夠獲取輸入數據的路徑,具體代碼以下:
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
(2)map的輸出的結果,同id的全部記錄(無論來自表A仍是表B)都在同一個key下保存在同一個列表中,在reduce階段須要將其拆開,保存爲至關於笛卡爾積的m x n條記錄。因爲事先不知道m、n是多少,這裏使用了兩個向量(可增加數組)來分別保存來自表A和表B的記錄,再用一個兩層嵌套循環組織出咱們須要的最終結果。
(3)在MapReduce中可使用System.out.println()方法輸出,以方便調試。不過System.out.println()的內容不會在終端顯示,而是輸出到了stdout和stderr這兩個文件中,這兩個文件位於logs/userlogs/attempt_xxx目錄下。能夠經過web端的歷史job查看中的「Analyse This Job」來查看stdout和stderr的內容。
相關文章
相關標籤/搜索