在大數據處理場景中,多表Join是很是常見的一類運算。爲了便於求解,一般會將多表join問題轉爲多個兩錶鏈接問題。兩表Join的實現算法很是多,通常咱們會根據兩表的數據特色選取不一樣的join算法,其中,最經常使用的兩個算法是map-side join和reduce-side join。本文將介紹如何在apache spark中實現這兩種算法。算法
(1)Map-side Joinspring
Map-side Join使用場景是一個大表和一個小表的鏈接操做,其中,「小表」是指文件足夠小,能夠加載到內存中。該算法能夠將join算子執行在Map端,無需經歷shuffle和reduce等階段,所以效率很是高。apache
在Hadoop MapReduce中, map-side join是藉助DistributedCache實現的。DistributedCache能夠幫咱們將小文件分發到各個節點的Task工做目錄下,這樣,咱們只需在程序中將文件加載到內存中(好比保存到Map數據結構中),而後藉助Mapper的迭代機制,遍歷另外一個大表中的每一條記錄,並查找是否在小表中,若是在則輸出,不然跳過。編程
在Apache Spark中,一樣存在相似於DistributedCache的功能,稱爲「廣播變量」(Broadcast variable)。其實現原理與DistributedCache很是相似,但提供了更多的數據/文件廣播算法,包括高效的P2P算法,該算法在節點數目很是多的場景下,效率遠遠好於DistributedCache這種基於HDFS共享存儲的方式,具體比較可參考「Performance and Scalability of Broadcast in Spark」。使用MapReduce DistributedCache時,用戶須要顯示地使用File API編寫程序從本地讀取小表數據,而Spark則不用,它藉助Scala語言強大的函數閉包特性,能夠隱藏數據/文件廣播過程,讓用戶編寫程序更加簡單。數據結構
假設兩個文件,一小一大,且格式相似爲:閉包
Key,value,valueapp
Key,value,valueide
則利用Spark實現map-side的算法以下:函數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
var
table
1
=
sc.textFile(args(
1
))
var
table
2
=
sc.textFile(args(
2
))
// table1 is smaller, so broadcast it as a map<String, String>
var
pairs
=
table
1
.map { x
=
>
var
pos
=
x.indexOf(
','
)
(x.substring(
0
, pos), x.substring(pos +
1
))
}.collectAsMap
var
broadCastMap
=
sc.broadcast(pairs)
//save table1 as map, and broadcast it
// table2 join table1 in map side
var
result
=
table
2
.map { x
=
>
var
pos
=
x.indexOf(
','
)
(x.substring(
0
, pos), x.substring(pos +
1
))
}.mapPartitions({ iter
=
>
var
m
=
broadCastMap.value
for
{
(key, value) <- iter
if
(m.contains(key))
}
yield
(key, (value, m.get(key).getOrElse(
""
)))
})
result.saveAsTextFile(args(
3
))
//save result to local file or HDFS
|
(2)Reduce-side Joinoop
當兩個文件/目錄中的數據很是大,難以將某一個存放到內存中時,Reduce-side Join是一種解決思路。該算法須要經過Map和Reduce兩個階段完成,在Map階段,將key相同的記錄劃分給同一個Reduce Task(需標記每條記錄的來源,便於在Reduce階段合併),在Reduce階段,對key相同的進行合併。
Spark提供了Join算子,能夠直接經過該算子實現reduce-side join,但要求RDD中的記錄必須是pair,即RDD[KEY, VALUE],一樣前一個例利用Reduce-side join實現以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
var
table
1
=
sc.textFile(args(
1
))
var
table
2
=
sc.textFile(args(
2
))
var
pairs
=
table
1
.map{x
=
>
var
pos
=
x.indexOf(
','
)
(x.substring(
0
, pos), x.substring(pos +
1
))
}
var
result
=
table
2
.map{x
=
>
var
pos
=
x.indexOf(
','
)
(x.substring(
0
, pos), x.substring(pos +
1
))
}.join(pairs)
result.saveAsTextFile(args(
3
))
|
(3)總結
本文介紹了Spark中map-side join和reduce-side join的編程思路,但願對你們有借鑑意義。但須要注意的是,在使用這兩種算法處理較大規模的數據時,一般須要對多個參數進行調優,不然可能會產生OOM問題。一般須要調優的相關參數包括,map端數據輸出buffer大小,reduce端數據分組方法(基於map仍是基於sort),等等。
(4)兩個問題
問題1:若是在map-side join中,不使用如下語句對文件1進行廣播,
var broadCastMap = sc.broadcast(pairs) 也能夠在後面程序中直接使用變量pairs存儲的數據進行join,這兩種方式有什麼異同,性能會有何不一樣? 問題2:將map-side join中的如下語句:123456mapPartitions({ iter => var m = broadCastMap.value for{ (key, value) <- iter if(m.contains(key)) } yield (key, (value, m.get(key).getOrElse("")))改成:1234var m = broadCastMap.value //這一句放在var table2 = sc.textFile(args(2))後面 map {case (key, value) => if(m.contains(key)) (key, (value, m.get(key).getOrElse(""))) }最終結果是有問題的,爲何? 本文兩個示例程序能夠從百度網盤上下載,地址爲Spark-Join-Exmaple。