Spark 中的join方式(pySpark)

     spark基礎知識請參考spark官網:http://spark.apache.org/docs/1.2.1/quick-start.htmlhtml

    不管是mapreduce仍是spark ,分佈式框架的性能優化方向大體分爲:負載均衡、網絡傳輸和磁盤I/O 這三塊。而spark是基於內存的計算框架,所以在編寫應用時須要充分利用其內存計算特徵。本篇主要針對sql

spark應用中的join問題進行討論,關於集羣參數的優化會在另外一篇文章中說起。數據庫

    在傳統的數據庫平臺和分佈式計算平臺,join的性能消耗都是很可觀的,對spark來講若是join的表比較大,那麼在shuffle時網絡及磁盤壓力會明顯提高,嚴重時可能會形成excutor失敗致使任務沒法進行下去,apache

對這種join的優化方法主要是採用map和filter來改變join的實現方式,減小shuffle階段的網絡和磁盤I/O。下面以表的數據量大小分兩部分來討論。緩存

   大表:數據量較大的表性能優化

   小表:數據量較小的表網絡

1、大表與小表之間的join數據結構

   這種join是大部分業務場景的主要join方式,將小表以broadcast的形式分發到每一個executor後對大表進行filter操做,如下對每種join進行示例說明(兼容表中ID不惟一的狀況)。負載均衡

  一、leftOuterJoin 框架

  >>>d1=sc.parallelize([(1,2),(2,3),(2,4),(3,4)])

  >>>d2=sc.parallelize([(1,'a'),(2,'b'),(1,'d'),(5,'2')])

  原生實現方式:

  >>>d1.leftOuterJoin(d2).collect()

  >>>[(1, (2, 'a')), (1, (2, 'd')), (2, (4, 'b')), (2, (3, 'b')), (3, (4, None))]

   map實現方式(小表在右的實現方式,小表在左的狀況會稍微複雜些,須要多一些操做操做,實際場景中很少見):

def  doJoin(row):
    result=[]
    if row[1][1] is not None:
        for i in row[1][1]:
            result+=[(row[0],(row[1][0],i))]
  else:
            result+=[row]
  return result

d2_map={}
for i in d2.groupByKey().collect():
    d2_map[i[0]]=i[1]
d2_broadcast=sc.broadcast(d2_map)
d2_dict=d2_broadcast.value
d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).flatMap(doJoin).collect()               

>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b')), (3, (4, None))]

二、join

這裏的join指的是innerjoin即只取出匹配到的數據項,只須要在上面的實現方式中加個filter便可

d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).filter(lambda row:row[1][1] is not None).flatMap(doJoin).collect()

>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b'))]

 

2、大表與大表之間的join(Reduce-join)

大表之間的join沒法經過緩存數據來達到優化目的,所以須要把優化的重點放在分區效率及key的設計上

一、join的key值儘可能使用數值類型,減小分區及shuffle的操做時間,在join時數值類型的key值在匹配時更快

二、將過濾條件放在join以前,使得join的數據量儘可能最少

三、在join以前將兩個表按相同分區數進行從新分區

reduce-join:指將兩個表按key值進行分區,相同key的數據會被分在同一個分區,最後使用mapPartition進行join操做。

 四、若是須要減小分區和並行度,請使用coalesce 而非repartition 方法。

* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.

3、其它優化方式

一、同一份數據被屢次用到,在讀入時進行緩存,後面直接使用,例如配置表,若是數據量不大則進行broadcast,不然使用cache

二、儘可能減小重複計算,一樣的計算邏輯只計算一次

三、幾個優化參數

spark.akka.frameSize 1000                       集羣間通訊 一幀數據的大小,設置過小可能會致使通訊延遲

spark.akka.timeout 100                             通訊等待最長時間(秒爲單位)
spark.akka.heartbeat.pauses 600                 心跳失敗最大間隔(秒爲單位)
spark.serializer org.apache.spark.serializer.KryoSerializer    序列化方式(sprak本身的實現方式)
spark.sql.autoBroadcastJoinThreshold -1           禁止自動broadcast表
spark.shuffle.consolidateFiles true             shuffle 自動合併小文件

 

4、後續優化方向

一、內存優化:對象所佔用的內存,訪問對象的消耗以及垃圾回收(garbage collection)所佔用的開銷

二、優化數據結構

三、優化RDD存儲

四、並行度

相關文章
相關標籤/搜索