reduce side join是一種最簡單的join方式,其主要思想以下:html
在map階段,map函數同時讀取兩個文件File1和File2,爲了區分兩種來源的key/value對,對每條數據打一個標籤(tag),好比:tag=1表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不一樣文件中的數據打標籤,在shuffle階段已經天然按key分組.java
在reduce階段,reduce函數獲取相同k2的v2 list(v2來自File1和File2), 而後對於同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的鏈接操做。node
這種方法有2個問題:面試
1, map階段沒有對數據瘦身,shuffle的網絡傳輸和排序性能很低。 2, reduce端對2個集合作乘積計算,很耗內存,容易致使OOM。
我關於reduce side join的博文總結地址:http://www.cnblogs.com/DreamDrive/p/7692042.html數據庫
之因此存在reduce side join,是由於在map階段不能獲取全部須要的join字段,即:同一個key對應的字段可能位於不一樣map中。Reduce side join是很是低效的,由於shuffle階段要進行大量的數據傳輸。apache
Map side join是針對如下場景進行的優化:緩存
兩個待鏈接表中,有一個表很是大,而另外一個表很是小,以致於小表能夠直接存放到內存中。這樣,咱們能夠將小表複製多份,讓每一個map task內存中存在一份(好比存放到hash table中),服務器
而後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,若是有,則鏈接後輸出便可。網絡
爲了支持文件的複製,Hadoop提供了一個類DistributedCache,使用該類的方法以下:app
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要複製的文件,它的參數是文件的URI(若是是HDFS上的文件,能夠這樣:hdfs://namenode:9000/home/XXX/file,其中9000是本身配置的NameNode端口號)。Job在做業啓動以前會獲取這個URI列表,並將相應的文件拷貝到各個Container的本地磁盤上。
(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。
這種方法的侷限性:
這種方法,要使用hadoop中的DistributedCache把小數據分佈到各個計算節點,每一個map節點都要把小數據庫加載到內存,按關鍵字創建索引。
這種方法有明顯的侷限性:有一份數據比較小,在map端,可以把它加載到內存,並進行join操做。
針對map join,能夠把一份數據存放到專門的內存服務器,在map()方法中,對每個<key,value>的輸入對,根據key到內存服務器中取出數據,進行鏈接
對其中一份數據在內存中創建BloomFilter,另一份數據在鏈接以前,用BloomFilter判斷它的key是否存在,若是不存在,那這個記錄是空鏈接,能夠忽略。
在mapreduce包裏看到有專門爲join設計的包,對這些包尚未學習,不知道怎麼使用,只是在這裏記錄下來,做個提醒。
jar: mapreduce-client-core.jar package: org.apache.hadoop.mapreduce.lib.join
有客戶數據customer和訂單數據orders。
customer
客戶編號 | 姓名 | 地址 | 電話 |
---|---|---|---|
1 | hanmeimei | ShangHai | 110 |
2 | leilei | BeiJing | 112 |
3 | lucy | GuangZhou | 119 |
** order**
訂單編號 | 客戶編號 | 其它字段被忽略 |
---|---|---|
1 | 1 | 50 |
2 | 1 | 200 |
3 | 3 | 15 |
4 | 3 | 350 |
5 | 3 | 58 |
6 | 1 | 42 |
7 | 1 | 352 |
8 | 2 | 1135 |
9 | 2 | 400 |
10 | 2 | 2000 |
11 | 2 | 300 |
要求對customer和orders按照客戶編號進行鏈接,結果要求對客戶編號分組,對訂單編號排序,對其它字段不做要求
客戶編號 | 訂單編號 | 訂單金額 | 姓名 | 地址 | 電話 |
---|---|---|---|---|---|
1 | 1 | 50 | hanmeimei | ShangHai | 110 |
1 | 2 | 200 | hanmeimei | ShangHai | 110 |
1 | 6 | 42 | hanmeimei | ShangHai | 110 |
1 | 7 | 352 | hanmeimei | ShangHai | 110 |
2 | 8 | 1135 | leilei | BeiJing | 112 |
2 | 9 | 400 | leilei | BeiJing | 112 |
2 | 10 | 2000 | leilei | BeiJing | 112 |
2 | 11 | 300 | leilei | BeiJing | 112 |
3 | 3 | 15 | lucy | GuangZhou | 119 |
3 | 4 | 350 | lucy | GuangZhou | 119 |
3 | 5 | 58 | lucy | GuangZhou | 119 |
上代碼:
1 public class MapSideJoin extends Configured implements Tool { 2 // customer文件在hdfs上的位置。 3 private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt"; 4 //客戶數據表對應的實體類 5 private static class CustomerBean { 6 private int custId; 7 private String name; 8 private String address; 9 private String phone; 10 11 public CustomerBean() { 12 } 13 14 public CustomerBean(int custId, String name, String address,String phone) { 15 super(); 16 this.custId = custId; 17 this.name = name; 18 this.address = address; 19 this.phone = phone; 20 } 21 22 public int getCustId() { 23 return custId; 24 } 25 26 public String getName() { 27 return name; 28 } 29 30 public String getAddress() { 31 return address; 32 } 33 34 public String getPhone() { 35 return phone; 36 } 37 } 38 //客戶訂單對應的實體類 39 private static class CustOrderMapOutKey implements WritableComparable<CustOrderMapOutKey> { 40 private int custId; 41 private int orderId; 42 43 public void set(int custId, int orderId) { 44 this.custId = custId; 45 this.orderId = orderId; 46 } 47 48 public int getCustId() { 49 return custId; 50 } 51 52 public int getOrderId() { 53 return orderId; 54 } 55 56 @Override 57 public void write(DataOutput out) throws IOException { 58 out.writeInt(custId); 59 out.writeInt(orderId); 60 } 61 62 @Override 63 public void readFields(DataInput in) throws IOException { 64 custId = in.readInt(); 65 orderId = in.readInt(); 66 } 67 68 @Override 69 public int compareTo(CustOrderMapOutKey o) { 70 int res = Integer.compare(custId, o.custId); 71 return res == 0 ? Integer.compare(orderId, o.orderId) : res; 72 } 73 74 @Override 75 public boolean equals(Object obj) { 76 if (obj instanceof CustOrderMapOutKey) { 77 CustOrderMapOutKey o = (CustOrderMapOutKey)obj; 78 return custId == o.custId && orderId == o.orderId; 79 } else { 80 return false; 81 } 82 } 83 84 @Override 85 public String toString() { 86 return custId + "\t" + orderId; 87 } 88 } 89 90 private static class JoinMapper extends Mapper<LongWritable, Text, CustOrderMapOutKey, Text> { 91 private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey(); 92 private final Text outputValue = new Text(); 93 /** 94 * 把表中每一行的客戶信息封裝成一個Map,存儲在內存中 95 * Map的key是客戶的id,value是封裝的客戶bean對象 96 */ 97 private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, Join.CustomerBean>(); 98 @Override 99 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 100 // 格式: 訂單編 客戶編號 訂單金額 101 String[] cols = value.toString().split("\t"); 102 if (cols.length < 3) { 103 return; 104 } 105 106 int custId = Integer.parseInt(cols[1]);// 取出客戶編號 107 CustomerBean customerBean = CUSTOMER_MAP.get(custId); 108 109 if (customerBean == null) {// 沒有對應的customer信息能夠鏈接 110 return; 111 } 112 113 StringBuffer sb = new StringBuffer(); 114 sb.append(cols[2]).append("\t") 115 .append(customerBean.getName()).append("\t") 116 .append(customerBean.getAddress()).append("\t") 117 .append(customerBean.getPhone()); 118 outputValue.set(sb.toString()); 119 outputKey.set(custId, Integer.parseInt(cols[0])); 120 context.write(outputKey, outputValue); 121 } 122 123 //在Mapper方法執行前執行 124 @Override 125 protected void setup(Context context) throws IOException, InterruptedException { 126 FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration()); 127 FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL)); 128 129 BufferedReader reader = new BufferedReader(new InputStreamReader(fdis)); 130 String line = null; 131 String[] cols = null; 132 133 // 格式:客戶編號 姓名 地址 電話 134 while ((line = reader.readLine()) != null) { 135 cols = line.split("\t"); 136 if (cols.length < 4) {// 數據格式不匹配,忽略 137 continue; 138 } 139 CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]); 140 CUSTOMER_MAP.put(bean.getCustId(), bean); 141 } 142 } 143 } 144 145 /** 146 * reduce 147 */ 148 private static class JoinReducer extends Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> { 149 @Override 150 protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 151 // 什麼事都不用作,直接輸出 152 for (Text value : values) { 153 context.write(key, value); 154 } 155 } 156 } 157 /** 158 * @param args 159 * @throws Exception 160 */ 161 public static void main(String[] args) throws Exception { 162 if (args.length < 2) { 163 new IllegalArgumentException("Usage: <inpath> <outpath>"); 164 return; 165 } 166 ToolRunner.run(new Configuration(), new Join(), args); 167 } 168 169 @Override 170 public int run(String[] args) throws Exception { 171 Configuration conf = getConf(); 172 Job job = Job.getInstance(conf, Join.class.getSimpleName()); 173 job.setJarByClass(SecondarySortMapReduce.class); 174 175 // 添加customer cache文件 176 job.addCacheFile(URI.create(CUSTOMER_CACHE_URL)); 177 178 FileInputFormat.addInputPath(job, new Path(args[0])); 179 FileOutputFormat.setOutputPath(job, new Path(args[1])); 180 181 // map settings 182 job.setMapperClass(JoinMapper.class); 183 job.setMapOutputKeyClass(CustOrderMapOutKey.class); 184 job.setMapOutputValueClass(Text.class); 185 186 // reduce settings 187 job.setReducerClass(JoinReducer.class); 188 job.setOutputKeyClass(CustOrderMapOutKey.class); 189 job.setOutputKeyClass(Text.class); 190 191 boolean res = job.waitForCompletion(true); 192 return res ? 0 : 1; 193 } 194 }
上面的代碼沒有使用DistributedCache類:
5.Map Side Join的再一個例子:
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.filecache.DistributedCache; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.Tool; 17 import org.apache.hadoop.util.ToolRunner; 18 import org.slf4j.Logger; 19 import org.slf4j.LoggerFactory; 20 /** 21 * 用途說明: 22 * Map side join中的left outer join 23 * 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段 24 * table1(左表):tb_dim_city 25 * (id int,name string,orderid int,city_code int,is_show int), 26 * 假設tb_dim_city文件記錄數不多 27 * tb_dim_city.dat文件內容,分隔符爲"|": 28 * id name orderid city_code is_show 29 * 0 其餘 9999 9999 0 30 * 1 長春 1 901 1 31 * 2 吉林 2 902 1 32 * 3 四平 3 903 1 33 * 4 松原 4 904 1 34 * 5 通化 5 905 1 35 * 6 遼源 6 906 1 36 * 7 白城 7 907 1 37 * 8 白山 8 908 1 38 * 9 延吉 9 909 1 39 * -------------------------風騷的分割線------------------------------- 40 * table2(右表):tb_user_profiles 41 * (userID int,userName string,network string,flow double,cityID int) 42 * tb_user_profiles.dat文件內容,分隔符爲"|": 43 * userID network flow cityID 44 * 1 2G 123 1 45 * 2 3G 333 2 46 * 3 3G 555 1 47 * 4 2G 777 3 48 * 5 3G 666 4 49 * .................................. 50 * .................................. 51 * -------------------------風騷的分割線------------------------------- 52 * 結果: 53 * 1 長春 1 901 1 1 2G 123 54 * 1 長春 1 901 1 3 3G 555 55 * 2 吉林 2 902 1 2 3G 333 56 * 3 四平 3 903 1 4 2G 777 57 * 4 松原 4 904 1 5 3G 666 58 */ 59 public class MapSideJoinMain extends Configured implements Tool{ 60 private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class); 61 62 public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> { 63 private HashMap<String,String> city_infoMap = new HashMap<String, String>(); 64 private Text outPutKey = new Text(); 65 private Text outPutValue = new Text(); 66 private String mapInputStr = null; 67 private String mapInputSpit[] = null; 68 private String city_secondPart = null; 69 /** 70 * 此方法在每一個task開始以前執行,這裏主要用做從DistributedCache 71 * 中取到tb_dim_city文件,並將裏邊記錄取出放到內存中。 72 */ 73 @Override 74 protected void setup(Context context) throws IOException, InterruptedException { 75 BufferedReader br = null; 76 //得到當前做業的DistributedCache相關文件 77 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 78 String cityInfo = null; 79 for(Path p : distributePaths){ 80 if(p.toString().endsWith("tb_dim_city.dat")){ 81 //讀緩存文件,並放到mem中 82 br = new BufferedReader(new FileReader(p.toString())); 83 while(null!=(cityInfo=br.readLine())){ 84 String[] cityPart = cityInfo.split("\\|",5); 85 if(cityPart.length ==5){ 86 city_infoMap.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]); 87 } 88 } 89 } 90 } 91 } 92 93 /** 94 * Map端的實現至關簡單,直接判斷tb_user_profiles.dat中的 95 * cityID是否存在個人map中就ok了,這樣就能夠實現Map Join了 96 */ 97 @Override 98 protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { 99 //排掉空行 100 if(value == null || value.toString().equals("")){ 101 return; 102 } 103 mapInputStr = value.toString(); 104 mapInputSpit = mapInputStr.split("\\|",4); 105 //過濾非法記錄 106 if(mapInputSpit.length != 4){ 107 return; 108 } 109 //判斷連接字段是否在map中存在 110 city_secondPart = city_infoMap.get(mapInputSpit[3]); 111 if(city_secondPart != null){ 112 this.outPutKey.set(mapInputSpit[3]); 113 this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]); 114 context.write(outPutKey, outPutValue); 115 } 116 } 117 } 118 @Override 119 public int run(String[] args) throws Exception { 120 Configuration conf=getConf(); //得到配置文件對象 121 DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//爲該job添加緩存文件 122 Job job=new Job(conf,"MapJoinMR"); 123 job.setNumReduceTasks(0); 124 125 FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑 126 FileOutputFormat.setOutputPath(job, new Path(args[2])); //設置reduce輸出文件路徑 127 128 job.setJarByClass(MapSideJoinMain.class); 129 job.setMapperClass(LeftOutJoinMapper.class); 130 131 job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式 132 job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式 133 134 //設置map的輸出key和value類型 135 job.setMapOutputKeyClass(Text.class); 136 137 //設置reduce的輸出key和value類型 138 job.setOutputKeyClass(Text.class); 139 job.setOutputValueClass(Text.class); 140 job.waitForCompletion(true); 141 return job.isSuccessful()?0:1; 142 } 143 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 144 try { 145 int returnCode = ToolRunner.run(new MapSideJoinMain(),args); 146 System.exit(returnCode); 147 } catch (Exception e) { 148 logger.error(e.getMessage()); 149 } 150 } 151 }
SemiJoin就是所謂的半鏈接,其實仔細一看就是reduce join的一個變種,就是在map端過濾掉一些數據,在網絡中只傳輸參與鏈接的數據不參與鏈接的數據沒必要在網絡中進行傳輸,從而減小了shuffle的網絡傳輸量,使總體效率獲得提升,其餘思想和reduce join是如出一轍的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來經過DistributedCach分發到相關節點,而後將其取出放到內存中(能夠放到HashSet中),在map階段掃描鏈接表,將join key不在內存HashSet中的記錄過濾掉,讓那些參與join的記錄經過shuffle傳輸到reduce端進行join操做,其餘的和reduce join都是同樣的。看代碼:
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.HashSet; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.filecache.DistributedCache; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.util.Tool; 20 import org.apache.hadoop.util.ToolRunner; 21 import org.slf4j.Logger; 22 import org.slf4j.LoggerFactory; 23 /** 24 * @author zengzhaozheng 25 * 26 * 用途說明: 27 * reudce side join中的left outer join 28 * 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段 29 * table1(左表):tb_dim_city 30 * (id int,name string,orderid int,city_code,is_show) 31 * tb_dim_city.dat文件內容,分隔符爲"|": 32 * id name orderid city_code is_show 33 * 0 其餘 9999 9999 0 34 * 1 長春 1 901 1 35 * 2 吉林 2 902 1 36 * 3 四平 3 903 1 37 * 4 松原 4 904 1 38 * 5 通化 5 905 1 39 * 6 遼源 6 906 1 40 * 7 白城 7 907 1 41 * 8 白山 8 908 1 42 * 9 延吉 9 909 1 43 * -------------------------風騷的分割線------------------------------- 44 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) 45 * tb_user_profiles.dat文件內容,分隔符爲"|": 46 * userID network flow cityID 47 * 1 2G 123 1 48 * 2 3G 333 2 49 * 3 3G 555 1 50 * 4 2G 777 3 51 * 5 3G 666 4 52 * .................................. 53 * .................................. 54 * -------------------------風騷的分割線------------------------------- 55 * joinKey.dat內容: 56 * city_code 57 * 1 58 * 2 59 * 3 60 * 4 61 * -------------------------風騷的分割線------------------------------- 62 * 結果: 63 * 1 長春 1 901 1 1 2G 123 64 * 1 長春 1 901 1 3 3G 555 65 * 2 吉林 2 902 1 2 3G 333 66 * 3 四平 3 903 1 4 2G 777 67 * 4 松原 4 904 1 5 3G 666 68 */ 69 public class SemiJoin extends Configured implements Tool{ 70 private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class); 71 public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> { 72 private CombineValues combineValues = new CombineValues(); 73 private HashSet<String> joinKeySet = new HashSet<String>(); 74 private Text flag = new Text(); 75 private Text joinKey = new Text(); 76 private Text secondPart = new Text(); 77 /** 78 * 將參加join的key從DistributedCache取出放到內存中,以便在map端將要參加join的key過濾出來。b 79 */ 80 @Override 81 protected void setup(Context context) throws IOException, InterruptedException { 82 BufferedReader br = null; 83 //得到當前做業的DistributedCache相關文件 84 Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 85 String joinKeyStr = null; 86 for(Path p : distributePaths){ 87 if(p.toString().endsWith("joinKey.dat")){ 88 //讀緩存文件,並放到mem中 89 br = new BufferedReader(new FileReader(p.toString())); 90 while(null!=(joinKeyStr=br.readLine())){ 91 joinKeySet.add(joinKeyStr); 92 } 93 } 94 } 95 } 96 @Override 97 protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { 98 //得到文件輸入路徑 99 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); 100 //數據來自tb_dim_city.dat文件,標誌即爲"0" 101 if(pathName.endsWith("tb_dim_city.dat")){ 102 String[] valueItems = value.toString().split("\\|"); 103 //過濾格式錯誤的記錄 104 if(valueItems.length != 5){ 105 return; 106 } 107 //過濾掉不須要參加join的記錄 108 if(joinKeySet.contains(valueItems[0])){ 109 flag.set("0"); 110 joinKey.set(valueItems[0]); 111 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]); 112 combineValues.setFlag(flag); 113 combineValues.setJoinKey(joinKey); 114 combineValues.setSecondPart(secondPart); 115 context.write(combineValues.getJoinKey(), combineValues); 116 }else{ 117 return ; 118 } 119 }//數據來自於tb_user_profiles.dat,標誌即爲"1" 120 else if(pathName.endsWith("tb_user_profiles.dat")){ 121 String[] valueItems = value.toString().split("\\|"); 122 //過濾格式錯誤的記錄 123 if(valueItems.length != 4){ 124 return; 125 } 126 //過濾掉不須要參加join的記錄 127 if(joinKeySet.contains(valueItems[3])){ 128 flag.set("1"); 129 joinKey.set(valueItems[3]); 130 secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]); 131 combineValues.setFlag(flag); 132 combineValues.setJoinKey(joinKey); 133 combineValues.setSecondPart(secondPart); 134 context.write(combineValues.getJoinKey(), combineValues); 135 }else{ 136 return ; 137 } 138 } 139 } 140 } 141 public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> { 142 //存儲一個分組中的左表信息 143 private ArrayList<Text> leftTable = new ArrayList<Text>(); 144 //存儲一個分組中的右表信息 145 private ArrayList<Text> rightTable = new ArrayList<Text>(); 146 private Text secondPar = null; 147 private Text output = new Text(); 148 /** 149 * 一個分組調用一次reduce函數 150 */ 151 @Override 152 protected void reduce(Text key, Iterable<CombineValues> value, Context context) throws IOException, InterruptedException { 153 leftTable.clear(); 154 rightTable.clear(); 155 /** 156 * 將分組中的元素按照文件分別進行存放 157 * 這種方法要注意的問題: 158 * 若是一個分組內的元素太多的話,可能會致使在reduce階段出現OOM, 159 * 在處理分佈式問題以前最好先了解數據的分佈狀況,根據不一樣的分佈採起最 160 * 適當的處理方法,這樣能夠有效的防止致使OOM和數據過分傾斜問題。 161 */ 162 for(CombineValues cv : value){ 163 secondPar = new Text(cv.getSecondPart().toString()); 164 //左表tb_dim_city 165 if("0".equals(cv.getFlag().toString().trim())){ 166 leftTable.add(secondPar); 167 } 168 //右表tb_user_profiles 169 else if("1".equals(cv.getFlag().toString().trim())){ 170 rightTable.add(secondPar); 171 } 172 } 173 logger.info("tb_dim_city:"+leftTable.toString()); 174 logger.info("tb_user_profiles:"+rightTable.toString()); 175 for(Text leftPart : leftTable){ 176 for(Text rightPart : rightTable){ 177 output.set(leftPart+ "\t" + rightPart); 178 context.write(key, output); 179 } 180 } 181 } 182 } 183 @Override 184 public int run(String[] args) throws Exception { 185 Configuration conf=getConf(); //得到配置文件對象 186 DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf); 187 Job job=new Job(conf,"LeftOutJoinMR"); 188 job.setJarByClass(SemiJoin.class); 189 190 FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑 191 FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑 192 193 job.setMapperClass(SemiJoinMapper.class); 194 job.setReducerClass(SemiJoinReducer.class); 195 196 job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式 197 job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式 198 199 //設置map的輸出key和value類型 200 job.setMapOutputKeyClass(Text.class); 201 job.setMapOutputValueClass(CombineValues.class); 202 203 //設置reduce的輸出key和value類型 204 job.setOutputKeyClass(Text.class); 205 job.setOutputValueClass(Text.class); 206 job.waitForCompletion(true); 207 return job.isSuccessful()?0:1; 208 } 209 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 210 try { 211 int returnCode = ToolRunner.run(new SemiJoin(),args); 212 System.exit(returnCode); 213 } catch (Exception e) { 214 logger.error(e.getMessage()); 215 } 216 } 217 }
這裏還說說SemiJoin也是有必定的適用範圍的,其抽取出來進行join的key是要放到內存中的,因此不可以太大,容易在Map端形成OOM。
blog介紹了三種join方式。這三種join方式適用於不一樣的場景,其處理效率上的相差仍是蠻大的,其中主要致使因素是網絡傳輸。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,寫分佈式大數據處理程序的時最好要對總體要處理的數據分佈狀況做一個瞭解,這能夠提升咱們代碼的效率,使數據的傾斜度降到最低,使咱們的代碼傾向性更好。