spark join

 

在大數據處理場景中,多表Join是很是常見的一類運算。爲了便於求解,一般會將多表join問題轉爲多個兩錶鏈接問題。兩表Join的實現算法很是多,通常咱們會根據兩表的數據特色選取不一樣的join算法,其中,最經常使用的兩個算法是map-side join和reduce-side join。本文將介紹如何在apache spark中實現這兩種算法。算法

(1)Map-side Joinspring

Map-side Join使用場景是一個大表和一個小表的鏈接操做,其中,「小表」是指文件足夠小,能夠加載到內存中。該算法能夠將join算子執行在Map端,無需經歷shuffle和reduce等階段,所以效率很是高。apache

在Hadoop MapReduce中, map-side join是藉助DistributedCache實現的。DistributedCache能夠幫咱們將小文件分發到各個節點的Task工做目錄下,這樣,咱們只需在程序中將文件加載到內存中(好比保存到Map數據結構中),而後藉助Mapper的迭代機制,遍歷另外一個大表中的每一條記錄,並查找是否在小表中,若是在則輸出,不然跳過。編程

在Apache Spark中,一樣存在相似於DistributedCache的功能,稱爲「廣播變量」(Broadcast variable)。其實現原理與DistributedCache很是相似,但提供了更多的數據/文件廣播算法,包括高效的P2P算法,該算法在節點數目很是多的場景下,效率遠遠好於DistributedCache這種基於HDFS共享存儲的方式,具體比較可參考「Performance and Scalability of Broadcast in Spark」。使用MapReduce DistributedCache時,用戶須要顯示地使用File API編寫程序從本地讀取小表數據,而Spark則不用,它藉助Scala語言強大的函數閉包特性,能夠隱藏數據/文件廣播過程,讓用戶編寫程序更加簡單。數據結構

假設兩個文件,一小一大,且格式相似爲:閉包

Key,value,valueapp

Key,value,valueide

則利用Spark實現map-side的算法以下:函數

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var table 1 = sc.textFile(args( 1 ))
var table 2 = sc.textFile(args( 2 ))
 
// table1 is smaller, so broadcast it as a map<String, String>
var pairs = table 1 .map { x = >
   var pos = x.indexOf( ',' )
   (x.substring( 0 , pos), x.substring(pos + 1 ))
}.collectAsMap
var broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it
 
// table2 join table1 in map side
var result = table 2 .map { x = >
   var pos = x.indexOf( ',' )
   (x.substring( 0 , pos), x.substring(pos + 1 ))
}.mapPartitions({ iter = >
   var m = broadCastMap.value
   for {
     (key, value) <- iter
     if (m.contains(key))
   } yield (key, (value, m.get(key).getOrElse( "" )))
})
 
result.saveAsTextFile(args( 3 )) //save result to local file or HDFS

(2)Reduce-side Joinoop

當兩個文件/目錄中的數據很是大,難以將某一個存放到內存中時,Reduce-side Join是一種解決思路。該算法須要經過Map和Reduce兩個階段完成,在Map階段,將key相同的記錄劃分給同一個Reduce Task(需標記每條記錄的來源,便於在Reduce階段合併),在Reduce階段,對key相同的進行合併。

Spark提供了Join算子,能夠直接經過該算子實現reduce-side join,但要求RDD中的記錄必須是pair,即RDD[KEY, VALUE],一樣前一個例利用Reduce-side join實現以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var table 1 = sc.textFile(args( 1 ))
var table 2 = sc.textFile(args( 2 ))
 
var pairs = table 1 .map{x = >
   var pos = x.indexOf( ',' )
   (x.substring( 0 , pos), x.substring(pos + 1 ))
}
 
var result = table 2 .map{x = >
   var pos = x.indexOf( ',' )
   (x.substring( 0 , pos), x.substring(pos + 1 ))
}.join(pairs)
 
result.saveAsTextFile(args( 3 ))

(3)總結

本文介紹了Spark中map-side join和reduce-side join的編程思路,但願對你們有借鑑意義。但須要注意的是,在使用這兩種算法處理較大規模的數據時,一般須要對多個參數進行調優,不然可能會產生OOM問題。一般須要調優的相關參數包括,map端數據輸出buffer大小,reduce端數據分組方法(基於map仍是基於sort),等等。

(4)兩個問題

問題1:若是在map-side join中,不使用如下語句對文件1進行廣播,

var broadCastMap = sc.broadcast(pairs)
也能夠在後面程序中直接使用變量pairs存儲的數據進行join,這兩種方式有什麼異同,性能會有何不一樣?
問題2:將map-side join中的如下語句:123456mapPartitions({ iter =>   var m = broadCastMap.value   for{     (key, value) <- iter     if(m.contains(key))   } yield (key, (value, m.get(key).getOrElse("")))改成:1234var m = broadCastMap.value //這一句放在var table2 = sc.textFile(args(2))後面 map {case (key, value) =>   if(m.contains(key)) (key, (value, m.get(key).getOrElse(""))) }最終結果是有問題的,爲何?  本文兩個示例程序能夠從百度網盤上下載,地址爲Spark-Join-Exmaple。
相關文章
相關標籤/搜索