1. 概述 html
在傳統數據庫(如:MYSQL)中,JOIN操做是很是常見且很是耗時的。而在HADOOP中進行JOIN操做,一樣常見且耗時,因爲Hadoop的獨特設計思想,當進行JOIN操做時,有一些特殊的技巧。 java
本文首先介紹了Hadoop上一般的JOIN實現方法,而後給出了幾種針對不一樣輸入數據集的優化方法。 node
2. 常見的join方法介紹 面試
假設要進行join的數據分別來自File1和File2. 數據庫
2.1 reduce side join apache
reduce side join是一種最簡單的join方式,其主要思想以下: 緩存
在map階段,map函數同時讀取兩個文件File1和File2,爲了區分兩種來源的key/value數據對,對每條數據打一個標籤(tag),好比:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不一樣文件中的數據打標籤。 網絡
在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 而後對於同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的鏈接操做。 app
REF:hadoop join之reduce side join maven
http://blog.csdn.net/huashetianzu/article/details/7819244
2.2 map side join
之因此存在reduce side join,是由於在map階段不能獲取全部須要的join字段,即:同一個key對應的字段可能位於不一樣map中。Reduce side join是很是低效的,由於shuffle階段要進行大量的數據傳輸。
Map side join是針對如下場景進行的優化:兩個待鏈接表中,有一個表很是大,而另外一個表很是小,以致於小表能夠直接存放到內存中。這樣,咱們能夠將小表複製多份,讓每一個map task內存中存在一份(好比存放到hash table中),而後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,若是有,則鏈接後輸出便可。
爲了支持文件的複製,Hadoop提供了一個類DistributedCache,使用該類的方法以下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要複製的文件,它的參數是文件的URI(若是是HDFS上的文件,能夠這樣:hdfs://namenode:9000/home/XXX/file,其中9000是本身配置的NameNode端口號)。JobTracker在做業啓動以前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。
REF:hadoop join之map side join
http://blog.csdn.net/huashetianzu/article/details/7821674
2.3 Semi Join
Semi Join,也叫半鏈接,是從分佈式數據庫中借鑑過來的方法。它的產生動機是:對於reduce side join,跨機器的數據傳輸量很是大,這成了join操做的一個瓶頸,若是可以在map端過濾掉不會參加join操做的數據,則能夠大大節省網絡IO。
實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件通常很小,能夠放到內存中。在map階段,使用DistributedCache將File3複製到各個TaskTracker上,而後將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工做與reduce side join相同。
更多關於半鏈接的介紹,可參考:半鏈接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html
REF:hadoop join之semi join
http://blog.csdn.net/huashetianzu/article/details/7823326
2.4 reduce side join + BloomFilter
在某些狀況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可使用BloomFiler以節省空間。
BloomFilter最多見的做用是:判斷某個元素是否在一個集合裏面。它最重要的兩個方法是:add() 和contains()。最大的特色是不會存在 false negative,即:若是contains()返回false,則該元素必定不在集合中,但會存在必定的 false positive,即:若是contains()返回true,則該元素必定可能在集合中。
於是可將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(可是在小表中的記錄必定不會過濾掉),這不要緊,只不過增長了少許的網絡IO而已。
更多關於BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500
3. 二次排序
在Hadoop中,默認狀況下是按照key進行排序,若是要按照value進行排序怎麼辦?即:對於同一個key,reduce函數接收到的value list是按照value排序的。這種應用需求在join操做中很常見,好比,但願相同的key中,小表對應的value排在前面。
有兩種方法進行二次排序,分別爲:buffer and in memory sort和 value-to-key conversion。
對於buffer and in memory sort,主要思想是:在reduce()函數中,將某個key對應的全部value保存下來,而後進行排序。 這種方法最大的缺點是:可能會形成out of memory。
對於value-to-key conversion,主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable接口或者調用setSortComparatorClass函數),這樣reduce獲取的結果即是先按key排序,後按value排序的結果,須要注意的是,用戶須要本身實現Paritioner,以便只按照key進行數據劃分。Hadoop顯式的支持二次排序,在Configuration類中有個setGroupingComparatorClass()方法,可用於設置排序group的key值,具體參考:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html
4. 後記
最近一直在找工做,因爲簡歷上寫了熟悉Hadoop,因此幾乎每一個面試官都會問一些Hadoop相關的東西,而 Hadoop上Join的實現就成了一道必問的問題,而極個別公司還會涉及到DistributedCache原理以及怎樣利用DistributedCache進行Join操做。爲了更好地應對這些面試官,特整理此文章。
5. 參考資料
(1) 書籍《Data-Intensive Text Processing with MapReduce》 page 60~67 Jimmy Lin and Chris Dyer,University of Maryland, College Park
(2) 書籍《Hadoop In Action》page 107~131
(3) mapreduce的二次排序 SecondarySort:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html
(4) 半鏈接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html
(5) BloomFilter介紹:http://blog.csdn.net/jiaomeng/article/details/1495500
(6)本文來自:http://dongxicheng.org/mapreduce/hadoop-join-two-tables/
————————————————————————————————————————————————
看完了上面的 hadoop 中 MR 常規 join 思路,下面咱們來看一種比較極端的例子,大表 join 小表,而小表的大小在 5M 如下的狀況:
之因此我這裏說小表要限制 5M 如下,是由於我這裏用到的思路是 :
file-》jar-》main String configuration -》configuration map HashMap
步驟:
一、從jar裏面讀取的文件內容以String的形式存在main方法的 configuration context 全局環境變量裏
二、在map函數裏讀取 context 環境變量的字符串,而後split字符串組建小表成爲一個HashMap
這樣一個大表關聯小表的例子就ok了,因爲context是放在namenode上的,而namenode對內存是有限制的,
因此你的小表文件不要太大,這樣咱們能夠比較的方便的利用 context 作join了。
這種方式其實就是 2.2 map side join 的一種具體實現而已。
Talk is cheap, show you the code~
public class Test { public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> { Configuration config = null; HashSet<String> idSet = new HashSet<String>(); HashMap<String, String> cityIdNameMap = new HashMap<String, String>(); Map<String, String> houseTypeMap = new HashMap<String, String>(); public void setup(Context context) { config = context.getConfiguration(); if (config == null) return; String idStr = config.get("idStr"); String[] idArr = idStr.split(","); for (String id : idArr) { idSet.add(id); } String cityIdNameStr = config.get("cityIdNameStr"); String[] cityIdNameArr = cityIdNameStr.split(","); for (String cityIdName : cityIdNameArr) { cityIdNameMap.put(cityIdName.split("\t")[0], cityIdName.split("\t")[1]); } houseTypeMap.put("8", "Test"); } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] info = value.toString().split("\\|"); String insertDate = info[InfoField.InsertDate].split(" ")[0] .split("-")[0]; // date: 2012-10-01 insertDate = insertDate + info[InfoField.InsertDate].split(" ")[0].split("-")[1]; // date:201210 String userID = info[InfoField.UserID]; // userid if (!idSet.contains(userID)) { return; } String disLocalID = ""; String[] disLocalIDArr = info[InfoField.DisLocalID].split(","); if (disLocalIDArr.length >= 2) { disLocalID = disLocalIDArr[1]; } else { try { disLocalID = disLocalIDArr[0]; } catch (Exception e) { e.printStackTrace(); return; } } String localValue = cityIdNameMap.get(disLocalID); disLocalID = localValue == null ? disLocalID : localValue; // city String[] cateIdArr = info[InfoField.CateID].split(","); String cateId = ""; String secondType = ""; if (cateIdArr.length >= 3) { cateId = cateIdArr[2]; if (houseTypeMap.get(cateId) != null) { secondType = houseTypeMap.get(cateId); // secondType } else { return; } } else { return; } String upType = info[InfoField.UpType]; String outKey = insertDate + "_" + userID + "_" + disLocalID + "_" + secondType; String outValue = upType.equals("0") ? "1_1" : "1_0"; context.write(new Text(outKey), new Text(outValue)); } } public static class ReducerClass extends Reducer<Text, Text, NullWritable, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int pv = 0; int uv = 0; for (Text val : values) { String[] tmpArr = val.toString().split("_"); pv += Integer.parseInt(tmpArr[0]); uv += Integer.parseInt(tmpArr[1]); } String outValue = key + "_" + pv + "_" + uv; context.write(NullWritable.get(), new Text(outValue)); } } public String getResource(String fileFullName) throws IOException { // 返回讀取指定資源的輸入流 InputStream is = this.getClass().getResourceAsStream(fileFullName); BufferedReader br = new BufferedReader(new InputStreamReader(is)); String s = ""; String res = ""; while ((s = br.readLine()) != null) res = res.equals("") ? s : res + "," + s; return res; } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.exit(2); } String idStr = new Test().getResource("userIDList.txt"); String cityIdNameStr = new Test().getResource("cityIdName.txt"); conf.set("idStr", idStr); conf.set("cityIdNameStr", cityIdNameStr); Job job = new Job(conf, "test01"); // job.setInputFormatClass(TextInputFormat.class); job.setJarByClass(Test.class); job.setMapperClass(Test.MapperClass.class); job.setReducerClass(Test.ReducerClass.class); job.setNumReduceTasks(25); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath( job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
說明:
一、getResource() 方法指定了能夠從jar包中讀取配置文件,並拼接成一個String返回。
二、setup() 方法起到一個mapreduce前的初始化的工做,他的做用是從 context 中
獲取main中存入的配置文件字符串,並用來構建一個hashmap,放在map外面,
每一個node上MR前只被執行一次。
三、注意上面代碼的第 12五、126 行,conf.set(key, value) 中的 value 大小是由限制的,
在 0.20.x 版本中是 5M 的大小限制,若是大於此大小建議採用分佈式緩存讀文件的策略。
參考:解決 hadoop jobconf 限制爲5M的問題
http://my.oschina.net/132722/blog/174601
推薦閱讀:
PS:關於如何從jar包中讀取配置文件,請參考:
http://www.iteye.com/topic/483115
(2)讀取jar內資源文件
http://heipark.iteye.com/blog/1439114
(3)Java相對路徑讀取資源文件
http://lavasoft.blog.51cto.com/62575/265821/
(4)Java加載資源文件時的路徑問題
http://www.cnblogs.com/lmtoo/archive/2012/10/18/2729272.html
如何優雅讀取properties文件
http://blogread.cn/it/article/3262?f=wb
注意:
不能先 getResource() 獲取路徑而後讀取內容,
由於".../ResourceJar.jar!/resource/...."並非文件資源定位符的格式。
因此,若是jar包中的類源代碼用File f=new File(相對路徑);的形式,是不可能定位到文件資源的。
這也是爲何源代碼打包成jar文件後,調用jar包時會報出FileNotFoundException的癥結所在了。
但能夠經過Class類的getResourceAsStream()方法來直接獲取文件內容 ,
這種方法是如何讀取jar中的資源文件的,這一點對於咱們來講是透明的。
並且 getResource() 和 getResourceAsStream() 在 maven 項目下對於相對、絕對路徑的尋找規則貌似還不同:
System.out.println(QQWryFile.class.getResource("/qqwry.dat").getFile());
System.out.println(QQWryFile.class.getClassLoader().getResourceAsStream("/qqwry.dat"));
System.out.println(QQWryFile.class.getClassLoader().getResourceAsStream("qqwry.dat"));
System.out.println(QQWryFile.class.getResourceAsStream("/qqwry.dat"));
System.out.println(QQWryFile.class.getResourceAsStream("qqwry.dat"));
TIPS:Class和ClassLoader的getResourceAsStream()方法的區別:
這兩個方法仍是略有區別的, 之前一直不加以區分,直到今天發現要寫這樣的代碼的時候運行
錯誤, 才把這個問題澄清了一下。
基本上,兩個均可以用於從 classpath 裏面進行資源讀取, classpath包含classpath中的路徑
和classpath中的jar。
兩個方法的區別是資源的定義不一樣, 一個主要用於相對與一個object取資源,而另外一個用於取相對於classpath的
資源,用的是絕對路徑。
在使用Class.getResourceAsStream 時, 資源路徑有兩種方式, 一種以 / 開頭,則這樣的路徑是指定絕對
路徑, 若是不以 / 開頭, 則路徑是相對與這個class所在的包的。
在使用ClassLoader.getResourceAsStream時, 路徑直接使用相對於classpath的絕對路徑。
舉例,下面的三個語句,實際結果是同樣的:
com.explorers.Test.class.getResourceAsStream("abc.jpg") = com.explorers.Test.class.getResourceAsStream("/com/explorers/abc.jpg") = ClassLoader.getResourceAsStream("com/explorers/abc.jpg")