Apache Spark探祕:實現Map-side Join和Reduce-side Join

做者: Dong  | 新浪微博: 西成懂  | 能夠轉載, 但必須以超連接形式標明文章原始出處和做者信息及 版權聲明
網址: http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/
本博客的文章集合: http://dongxicheng.org/recommend/

本博客微信公共帳號:hadoop123(微信號爲:hadoop-123),分享hadoop技術內幕,hadoop最新技術進展,發佈hadoop相關職位和求職信息,hadoop技術交流聚會、講座以及會議等。二維碼以下:
算法


在大數據處理場景中,多表Join是很是常見的一類運算。爲了便於求解,一般會將多表join問題轉爲多個兩錶鏈接問題。兩表Join的實現算法很是多,通常咱們會根據兩表的數據特色選取不一樣的join算法,其中,最經常使用的兩個算法是map-side join和reduce-side join。本文將介紹如何在apache spark中實現這兩種算法。

(1)Map-side Join spring

Map-side Join使用場景是一個大表和一個小表的鏈接操做,其中,「小表」是指文件足夠小,能夠加載到內存中。該算法能夠將join算子執行在Map端,無需經歷shuffle和reduce等階段,所以效率很是高。 apache

在Hadoop MapReduce中, map-side join是藉助DistributedCache實現的。DistributedCache能夠幫咱們將小文件分發到各個節點的Task工做目錄下,這樣,咱們只需在程序中將文件加載到內存中(好比保存到Map數據結構中),而後藉助Mapper的迭代機制,遍歷另外一個大表中的每一條記錄,並查找是否在小表中,若是在則輸出,不然跳過。 編程

在Apache Spark中,一樣存在相似於DistributedCache的功能,稱爲「廣播變量」(Broadcast variable)。其實現原理與DistributedCache很是相似,但提供了更多的數據/文件廣播算法,包括高效的P2P算法,該算法在節點數目很是多的場景下,效率遠遠好於DistributedCache這種基於HDFS共享存儲的方式,具體比較可參考「Performance and Scalability of Broadcast in Spark」。使用MapReduce DistributedCache時,用戶須要顯示地使用File API編寫程序從本地讀取小表數據,而Spark則不用,它藉助Scala語言強大的函數閉包特性,能夠隱藏數據/文件廣播過程,讓用戶編寫程序更加簡單。 微信

假設兩個文件,一小一大,且格式相似爲: 數據結構

Key,value,value 閉包

Key,value,value app

則利用Spark實現map-side的算法以下: ide

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vartable1=sc.textFile(args(1))
vartable2=sc.textFile(args(2))
 
// table1 is smaller, so broadcast it as a map<String, String>
varpairs=table1.map { x=>
  varpos=x.indexOf(',')
  (x.substring(0, pos), x.substring(pos +1))
}.collectAsMap
varbroadCastMap=sc.broadcast(pairs)//save table1 as map, and broadcast it
 
// table2 join table1 in map side
varresult=table2.map { x=>
  varpos=x.indexOf(',')
  (x.substring(0, pos), x.substring(pos +1))
}.mapPartitions({ iter=>
  varm=broadCastMap.value
  for{
    (key, value) <- iter
    if(m.contains(key))
  }yield(key, (value, m.get(key).getOrElse("")))
})
 
result.saveAsTextFile(args(3))//save result to local file or HDFS

(2)Reduce-side Join 函數

當兩個文件/目錄中的數據很是大,難以將某一個存放到內存中時,Reduce-side Join是一種解決思路。該算法須要經過Map和Reduce兩個階段完成,在Map階段,將key相同的記錄劃分給同一個Reduce Task(需標記每條記錄的來源,便於在Reduce階段合併),在Reduce階段,對key相同的進行合併。

Spark提供了Join算子,能夠直接經過該算子實現reduce-side join,但要求RDD中的記錄必須是pair,即RDD[KEY, VALUE],一樣前一個例利用Reduce-side join實現以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vartable1=sc.textFile(args(1))
vartable2=sc.textFile(args(2))
 
varpairs=table1.map{x=>
  varpos=x.indexOf(',')
  (x.substring(0, pos), x.substring(pos +1))
}
 
varresult=table2.map{x=>
  varpos=x.indexOf(',')
  (x.substring(0, pos), x.substring(pos +1))
}.join(pairs)
 
result.saveAsTextFile(args(3))

(3)總結

本文介紹了Spark中map-side join和reduce-side join的編程思路,但願對你們有借鑑意義。但須要注意的是,在使用這兩種算法處理較大規模的數據時,一般須要對多個參數進行調優,不然可能會產生OOM問題。一般須要調優的相關參數包括,map端數據輸出buffer大小,reduce端數據分組方法(基於map仍是基於sort),等等。

(4)兩個問題

問題1:若是在map-side join中,不使用如下語句對文件1進行廣播,

var broadCastMap = sc.broadcast(pairs)
也能夠在後面程序中直接使用變量pairs存儲的數據進行join,這兩種方式有什麼異同,性能會有何不一樣?
問題2:將map-side join中的如下語句:


  
  
  
  
  

 
1
2
3
4
5
6
mapPartitions({ iter=>
  varm=broadCastMap.value
  for{
    (key, value) <- iter
    if(m.contains(key))
  }yield(key, (value, m.get(key).getOrElse("")))

改成:

1
2
3
4
varm=broadCastMap.value//這一句放在var table2 = sc.textFile(args(2))後面
map {case(key, value)=>
  if(m.contains(key)) (key, (value, m.get(key).getOrElse("")))
}

最終結果是有問題的,爲何? 本文兩個示例程序能夠從百度網盤上下載,地址爲Spark-Join-Exmaple

原創文章,轉載請註明: 轉載自董的博客

本文連接地址: http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/

做者:Dong,做者介紹:http://dongxicheng.org/about/

本博客的文章集合:http://dongxicheng.org/recommend/

相關文章
相關標籤/搜索