在沒有 pig 或者 hive 的環境下,直接在 mapreduce 中本身實現 join 是一件極其蛋疼的事情,MR中的join分爲好幾種,好比有最多見的 reduce side join,map side join,semi join 等。今天咱們要討論的是第 2 種:map side join,這種 join 在處理多個小表關聯大表時很是有用,而 reduce join 在處理多表關聯時是比較麻煩的,會形成大量的網絡IO,效率低下。 java
一、原理: shell
之因此存在reduce side join,是由於在map階段不能獲取全部須要的join字段,即:同一個key對應的字段可能位於不一樣map中。但 Reduce side join是很是低效的,由於shuffle階段要進行大量的數據傳輸。Map side join是針對如下場景進行的優化:兩個待鏈接表中,有一個表很是大,而另外一個表很是小,以致於小表能夠直接存放到內存中。這樣,咱們能夠將小表複製多份,讓每一個map task內存中存在一份(好比存放到hash table中),而後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,若是有,則鏈接後輸出便可。爲了支持文件的複製,Hadoop提供了一個類DistributedCache,使用該類的方法以下: apache
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要複製的文件,它的參數是文件的URI(若是是HDFS上的文件,能夠這樣:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在做業啓動以前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。 緩存
(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。 網絡
二、環境: 架構
本實例須要的測試文件及 hdfs 文件存放目錄以下: app
hadoop fs -ls /test/decli
Found 4 items
-rw-r--r-- 2 root supergroup 152 2013-03-06 02:05 /test/decli/login
drwxr-xr-x - root supergroup 0 2013-03-06 02:45 /test/decli/output
-rw-r--r-- 2 root supergroup 12 2013-03-06 02:12 /test/decli/sex
-rw-r--r-- 2 root supergroup 72 2013-03-06 02:44 /test/decli/user
分佈式
測試文件內容分別爲: ide
root@master 192.168.120.236 02:58:03 ~/test/table >
cat login # 登陸表,須要判斷 uid 列是否有效,並獲得對應用戶名、性別、訪問次數
1 0 20121213
2 0 20121213
3 1 20121213
4 1 20121213
1 0 20121114
2 0 20121114
3 1 20121114
4 1 20121114
1 0 20121213
1 0 20121114
9 0 20121114
root@master 192.168.120.236 02:58:08 ~/test/table >
cat sex # 性別表
0 男
1 女
root@master 192.168.120.236 02:58:13 ~/test/table >
cat user # 用戶屬性表
1 張三 hubei
3 王五 tianjin
4 趙六 guangzhou
2 李四 beijing
root@master 192.168.120.236 02:58:16 ~/test/table >
oop
測試環境 hadoop 版本:
echo $HADOOP_HOME /work/hadoop-0.20.203.0
好了,廢話少說,上代碼:
三、代碼:
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MultiTableJoin extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { // 用於緩存 sex、user 文件中的數據 private Map<String, String> userMap = new HashMap<String, String>(); private Map<String, String> sexMap = new HashMap<String, String>(); private Text oKey = new Text(); private Text oValue = new Text(); private String[] kv; // 此方法會在map方法執行以前執行 @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 從當前做業中獲取要緩存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context .getConfiguration()); String uidNameAddr = null; String sidSex = null; for (Path path : paths) { if (path.toString().contains("user")) { in = new BufferedReader(new FileReader(path.toString())); while (null != (uidNameAddr = in.readLine())) { userMap.put(uidNameAddr.split("\t", -1)[0], uidNameAddr.split("\t", -1)[1]); } } else if (path.toString().contains("sex")) { in = new BufferedReader(new FileReader(path.toString())); while (null != (sidSex = in.readLine())) { sexMap.put(sidSex.split("\t", -1)[0], sidSex.split( "\t", -1)[1]); } } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { e.printStackTrace(); } } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { kv = value.toString().split("\t"); // map join: 在map階段過濾掉不須要的數據 if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) { oKey.set(userMap.get(kv[0]) + "\t" + sexMap.get(kv[1])); oValue.set("1"); context.write(oKey, oValue); } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { private Text oValue = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int sumCount = 0; for (Text val : values) { sumCount += Integer.parseInt(val.toString()); } oValue.set(String.valueOf(sumCount)); context.write(key, oValue); } } public int run(String[] args) throws Exception { Job job = new Job(getConf(), "MultiTableJoin"); job.setJobName("MultiTableJoin"); job.setJarByClass(MultiTableJoin.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); // 咱們把第一、2個參數的地址做爲要緩存的文件路徑 DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(), job .getConfiguration()); DistributedCache.addCacheFile(new Path(otherArgs[2]).toUri(), job .getConfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[3])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[4])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MultiTableJoin(), args); System.exit(res); } }
運行命令:
hadoop jar MultiTableJoin.jar MultiTableJoin /test/decli/sex /test/decli/user /test/decli/login /test/decli/output
四、結果:
運行結果: root@master 192.168.120.236 02:47:18 ~/test/table >
hadoop fs -cat /test/decli/output/*|column -t
cat: File does not exist: /test/decli/output/_logs
張三 男 4
李四 男 2
王五 女 2
趙六 女 2
root@master 192.168.120.236 02:47:26 ~/test/table >
TIPS:
更多關於 hadoop mapreduce 相關 join 介紹,請參考以前的博文:
MapReduce 中的兩表 join 幾種方案簡介
http://my.oschina.net/leejun2005/blog/95186
本例中用到了分佈式緩存,關於分佈式緩存的一些特性與原理,以及注意事項,
請參考:
HDFS 原理、架構與特性介紹