spark基礎知識請參考spark官網:http://spark.apache.org/docs/1.2.1/quick-start.htmlhtml
不管是mapreduce仍是spark ,分佈式框架的性能優化方向大體分爲:負載均衡、網絡傳輸和磁盤I/O 這三塊。而spark是基於內存的計算框架,所以在編寫應用時須要充分利用其內存計算特徵。本篇主要針對sql
spark應用中的join問題進行討論,關於集羣參數的優化會在另外一篇文章中說起。數據庫
在傳統的數據庫平臺和分佈式計算平臺,join的性能消耗都是很可觀的,對spark來講若是join的表比較大,那麼在shuffle時網絡及磁盤壓力會明顯提高,嚴重時可能會形成excutor失敗致使任務沒法進行下去,apache
對這種join的優化方法主要是採用map和filter來改變join的實現方式,減小shuffle階段的網絡和磁盤I/O。下面以表的數據量大小分兩部分來討論。緩存
大表:數據量較大的表性能優化
小表:數據量較小的表網絡
1、大表與小表之間的join數據結構
這種join是大部分業務場景的主要join方式,將小表以broadcast的形式分發到每一個executor後對大表進行filter操做,如下對每種join進行示例說明(兼容表中ID不惟一的狀況)。負載均衡
一、leftOuterJoin 框架
>>>d1=sc.parallelize([(1,2),(2,3),(2,4),(3,4)])
>>>d2=sc.parallelize([(1,'a'),(2,'b'),(1,'d'),(5,'2')])
原生實現方式:
>>>d1.leftOuterJoin(d2).collect()
>>>[(1, (2, 'a')), (1, (2, 'd')), (2, (4, 'b')), (2, (3, 'b')), (3, (4, None))]
map實現方式(小表在右的實現方式,小表在左的狀況會稍微複雜些,須要多一些操做操做,實際場景中很少見):
def doJoin(row): result=[] if row[1][1] is not None: for i in row[1][1]: result+=[(row[0],(row[1][0],i))] else: result+=[row] return result d2_map={} for i in d2.groupByKey().collect(): d2_map[i[0]]=i[1] d2_broadcast=sc.broadcast(d2_map) d2_dict=d2_broadcast.value d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).flatMap(doJoin).collect()
>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b')), (3, (4, None))]
二、join
這裏的join指的是innerjoin即只取出匹配到的數據項,只須要在上面的實現方式中加個filter便可
d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).filter(lambda row:row[1][1] is not None).flatMap(doJoin).collect()
>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b'))]
2、大表與大表之間的join(Reduce-join)
大表之間的join沒法經過緩存數據來達到優化目的,所以須要把優化的重點放在分區效率及key的設計上
一、join的key值儘可能使用數值類型,減小分區及shuffle的操做時間,在join時數值類型的key值在匹配時更快
二、將過濾條件放在join以前,使得join的數據量儘可能最少
三、在join以前將兩個表按相同分區數進行從新分區
reduce-join:指將兩個表按key值進行分區,相同key的數據會被分在同一個分區,最後使用mapPartition進行join操做。
四、若是須要減小分區和並行度,請使用coalesce 而非repartition 方法。
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
3、其它優化方式
一、同一份數據被屢次用到,在讀入時進行緩存,後面直接使用,例如配置表,若是數據量不大則進行broadcast,不然使用cache
二、儘可能減小重複計算,一樣的計算邏輯只計算一次
三、幾個優化參數
spark.akka.frameSize 1000 集羣間通訊 一幀數據的大小,設置過小可能會致使通訊延遲
spark.akka.timeout 100 通訊等待最長時間(秒爲單位)
spark.akka.heartbeat.pauses 600 心跳失敗最大間隔(秒爲單位)
spark.serializer org.apache.spark.serializer.KryoSerializer 序列化方式(sprak本身的實現方式)
spark.sql.autoBroadcastJoinThreshold -1 禁止自動broadcast表
spark.shuffle.consolidateFiles true shuffle 自動合併小文件
4、後續優化方向
一、內存優化:對象所佔用的內存,訪問對象的消耗以及垃圾回收(garbage collection)所佔用的開銷
二、優化數據結構
三、優化RDD存儲
四、並行度