深刻研究 Apache Spark 3.0 的新功能

分享嘉賓 Apache Spark PMC 李瀟,就任於 Databricks,Spark 研發部主管,領導 Spark,Koalas,Databricks runtime,OEM 的研發團隊,在直播中爲你們深刻講解了Apache Spark 3.0的新功能。
html


直播回放:https://developer.aliyun.com/live/2894web

如下是直播內容精華整理。sql


Spark3.0解決了超過3400個JIRAs,歷時一年多,是整個社區集體智慧的成果。Spark SQL和Spark Cores是其中的核心模塊,其他模塊如PySpark等模塊均是創建在二者之上。Spark3.0新增了太多的功能,沒法一一列舉,下圖是其中24個相對來講比較重要的新功能,下文將會圍繞這些進行簡單介紹。數據庫

1、Performance

與性能相關的新功能主要有:apache

  • Adaptive Query Execution微信

  • Dynamic Partition Pruning架構

  • Query Complication Speedupapp

  • Join Hints框架

(一)Adaptive Query Execution

Adaptive Query Execution(AQE)在以前的版本里已經有所實現,可是以前的框架存在一些缺陷,致使使用不是不少,在Spark3.0中Databricks(Spark初創團隊建立的大數據與AI智能公司)和Intel的工程師合做,解決了相關的問題。函數

在Spark1.0中全部的Catalyst Optimizer都是基於規則 (rule) 優化的。爲了產生比較好的查詢規則,優化器須要理解數據的特性,因而在Spark2.0中引入了基於代價的優化器 (cost-based optimizer),也就是所謂的CBO。然而,CBO也沒法解決不少問題,好比:

  • 數據統計信息廣泛缺失,統計信息的收集代價較高;

  • 儲存計算分離的架構使得收集到的統計信息可能再也不準確;

  • Spark部署在某一單一的硬件架構上,cost很難被估計;

  • Spark的UDF(User-defined Function)簡單易用,種類繁多,可是對於CBO來講是個黑盒子,沒法估計其cost。

總而言之,因爲種種限制,Spark的優化器沒法產生最好的Plan。也正是由於上訴緣由,運行期的自適應調整就變得至關重要,對於Spark更是如此,因而有了AQE,其基本方法也很是簡單易懂。以下圖所示,在執行完部分的查詢規劃後,Spark能夠收集到結果的統計信息,而後利用這些信息再對查詢規劃從新進行優化。這個優化的過程不是一次性的,而是自適應的,也就是說隨着查詢規劃的執行會不斷的進行優化, 並且儘量地複用了現有優化器的已有優化規則。讓整個查詢優化變得更加靈活和自適應。

Spark3.0中AQE包括三個主要的運行期自適應功能:

  • 能夠基於運行期的統計信息,將Sort Merge Join 轉換爲Broadcast Hash Join;

  • 能夠基於數據運行中間結果的統計信息,減小reducer數量,避免數據在shuffle期間的過量分區致使性能損失;

  • 能夠處理數據分佈不均致使的skew join。

更多的信息你們能夠經過搜索引擎查詢瞭解。

若是你是一個Spark的資深用戶,可能你讀了不少的調優寶典,其中第一條就是讓你的Join變得更快的方法就是儘量地使用Broadcast Hash Join。好比你能夠增長spark.sql.autoBroadcastJoinThreshold 閾值,或者使用 broadcast HINT。可是這基本上屬於藝高人膽大。首先,這種方法很難調,一不當心就會Out of Memory,甚至性能變得更差,即便如今產生了必定效果,可是隨着負載的變化可能調優會徹底失敗。

也許你會想:Spark爲何不解決這個問題呢?這裏有不少挑戰,好比:

  • 統計信息的缺失,統計信息的不許確,那麼就是默認依據文件大小來預估表的大小,可是文件每每是壓縮的,尤爲是列存儲格式,好比parquet 和 ORC,而Spark是基於行處理,若是數據連續重複,file size可能和真實的行存儲的真實大小,差異很是之大。這也是爲什麼提升autoBroadcastJoinThreshold,即便不是太大也可能會致使out of memory;

  • Filter複雜、UDFs的使用都會使Spark沒法準確估計Join輸入數據量的大小。當你的query plan異常大和複雜的時候,這點尤爲明顯。

其中,Spark3.0中基於運行期的統計信息,將Sort Merge Join 轉換爲Broadcast Hash Join的過程以下圖所示。

也許你還會看到調優寶典告訴你調整shuffle產生的partitions的數量。而當前默認數量是200,可是這個200爲何就不得而知了。然而,這個值設置爲多少都不是最優的。其實在不一樣shuffle,數據的輸入大小和分佈絕大多數都是不同。那麼簡單地用一個配置,讓全部的shuffle來遵循,顯然是很差的。要設得過小,每一個partition的大小就會太大,那麼GC的壓力就會很大,aggregation和sort會更有可能的去spill數據到磁盤。可是,要是設太大,partition的大小就會過小,partition的數量會大。這個會致使沒必要要的IO,也讓task調度器的壓力劇增。那麼調度器會致使全部task都變慢。這一系列問題在query plan複雜的時候變得尤其突出,還可能會影響到其餘性能,最後耗時耗力卻調優失敗。

對於這個問題的解決,AQE就有優點了。以下圖所示,AQE能夠在運行期動態的調整partition來達到性能最優。

此外,數據分佈不均是Spark調優的一個疑難雜症,它的表現有多種,好比若干task停滯不前,像是出現了bugs,又好比大量的disk spilling會致使不少節點都無事可作。此外,你也許會看到out of memory這種異常。其解決方法也不少,好比找到skew values而後重寫query,或者在join的狀況下增長skew keys來消除數據分佈不均,可是不管哪一種方法,都很是浪費時間,且後期難以維護。AQE解決問題的方式以下,其經過shuffle落地後的中間數據結果判斷哪些partition是skew的,若是partition過大,就將其分紅若干較小的partition,經過分而治之,整體性能大幅提高。

AQE的發佈能夠說是一個時代的開始,將來將會更進一步發展,引入更多自適應規則,讓Spark能夠隨着數據分佈和特性的變化自動改變Query plan,讓更多的query編譯靜態優化變成運行時的動態優化。

(二)Dynamic Partition Pruning

Dynamic Partition Pruning也是一個運行時的動態優化方法,簡單來講就是咱們能夠經過Query的某些分支的中間結果來避免沒必要要的partition讀取,這種方法是沒法經過編譯期推測出來的,只能在運行時根據結果來判斷,這種方法對數據倉庫的star-schema效果很是明顯,在TPC-DS得到了很是明顯的加速,能夠加速2-18倍。

(三)Join Hints

Join Hints是一個很是廣泛的數據庫的優化策略,在3.0以前已經有了Broadcast hash join,3.0以後的版本加了Sort-merge join、Shuffle hash join和 Shuffle nested loop join,可是要注意謹慎使用,由於數據的特性不一樣,很難保證一直有效,即便有效,也不表明一直有效,隨着時間的變化,你的數據變了,可能會讓你的query 變慢,變得不穩定。整體來講上面的四種Join的適用條件和特色以下所示,總而言之,使用Join Hints要謹慎。

2、Richer APIs

Spark3.0簡化了開發,不但增長了更多的新功能,也改善了衆多現有的功能,讓更多的用法成爲可能,主要有:

  • Accelerator-aware Scheduler

  • Built-in Functions

  • pandas UDF enhancements

  • DELETE/UPDATE/MERGE in Catalyst

(一)pandas UDF enhancements

pandas UDF應該說是PySPark用戶中最喜好的特性之一,對於其功能和性能的提高應該都是喜聞樂見的,其發展歷程以下圖所示。

最新的pandas UDF和以前的不一樣之處在於引入了Python Type Hints,如今用戶可使用pandas中的數據類型好比pandas.Series等來表示pandas UDF的種類,再也不須要記住原來的UDF類型,只須要指定正確的輸入和輸出類型便可。此外,pandas UDF能夠分爲pandas UDF和pandas API。

(二)Accelerator-aware Scheduler

Accelerator-aware Scheduler是加速器的調度支持,狹義上也就是指GPU調度支持。加速器常常用來對特定負載作加速,目前,用戶仍是須要指定什麼應用須要加速器資源,可是在未來咱們會支持job或者stage級別的調度。Spark3.0中咱們已經支持大多調度器,此外,咱們還能夠經過Web UI來監控GPU的使用,歡迎你們使用,更多詳細資料你們能夠到社區學習。

(三)Built-in Functions

爲了讓Spark3.0更方便實用,Spark社區按照其餘的主流,好比數據庫廠商等,內嵌瞭如上圖所示的32個經常使用函數,這樣用戶就無須本身寫UDF,而且速度更快。好比針對map類型,Spark3.0新增長了map_keys和map_values,更加地方便易用。其餘新增長的更多內嵌函數你們能夠到社區具體瞭解。

3、Monitoring and Debuggability

Spark3.0也增長了一些對監控和調優的改進,主要有:

  • Structured Streaming UI

  • DDL/DML Enhancements

  • Observable Metrics

  • Event Log Rollover

(一)Structured Streaming UI

Structured Streaming是在Spark2.0中發佈的,在Spark3.0中加入了UI的配置。新的UI主要包括了兩種統計信息:已完成的Streaming查詢聚合信息和未完成的Streaming查詢的當前信息,包括Input Rate、Process Rate、Batch Duration和Operate Duration。

(二)DDL/DML Enhancements

咱們還增長了各類DDL/DML命令,好比EXPLAIN和。
EXPLAIN是性能調優的必備工具,讀取EXPLAIN是每一個用戶的基本功,可是隨着系統的運行,EXPLAIN的信息愈來愈多,並且信息多元、多樣,在新的版本中咱們引入了新的FORMATTED模式,以下所示,在開頭處有一個很是精簡的樹狀圖,且以後的每一個部分都有很詳細的解釋,更容易加更多的注意,這就從水平擴展變成了垂直擴展,更加的直觀。

(三)Observable Metrics

咱們還引入了Observable Metrics用以觀測數據的質量。要知道數據質量對於不少Spark應用都是至關重要的,一般定義數據質量的Metrics仍是很是容易的,好比用一些聚合參數,可是算出這個Metrics的值就很是麻煩,尤爲對於流計算來講。

4、SQL Compatibility

SQL兼容性也是Spark必不可提的話題,良好的兼容性更方便用戶遷移到Spark平臺,在Spark3.0中新增的主要功能有:

  • ANSI Store Assignment

  • Overflow Checking

  • Reserved Keywords in Parser

  • Proleptic Gregorian Calendar

也就是說,這個版本中咱們讓insert遵照了ANSI Store Assignment,而且增長了運行時的overflow的檢查,還提供了一個模式讓SQL Parser來準確地遵照ANSI標準的保留字,還切換了Calendar,這樣更加符合ANSI的SQL標準。好比說咱們想要插入兩列數據,類型是int和string,若是將int插入到了string中,仍是能夠的,不會發生數據精度的損失和數據丟失;可是若是咱們嘗試將string類型插入到int類型中,就有可能發生數據損失甚至丟失。ANSI Store Assignment+Overflow Checking在輸入不合法的時候就會在運行時拋出異常,須要注意的是這個設置默認是關閉的,能夠根據我的須要打開。

5、Built-in Data Sources

在這個版本中咱們提高了預裝的數據源,好比Parquet table,咱們能夠對Nested Column作Column Pruning和Filter Pushdown,此外還支持了對CSV的Filter Pushdown,還引入了Binary Data Source來處理相似於二進制的圖片文件。

6、Extensibility and Ecosystem

Spark3.0繼續增強了對生態圈的建設:

  • 對Data Source V2 API的持續改善和catalog支持;

  • 支持Java 11;

  • 支持Hadoop 3;

  • 支持Hive 3。

(一)Data Source V2 API+Catalog Support

Spark3.0加上了對Catalog的支持來擴展Data Source API。Catalog plugin API可讓用戶註冊本身的catalog來實現對元數據的處理,這樣可讓Spark用戶更簡單方便的使用數據源的表。對於沒有實現Catalog plugin的數據源,用戶須要先註冊每一個外部數據源的表才能訪問,可是實現了Catalog plugin API以後咱們只須要註冊Catalog,而後就能夠直接遠程訪問和操做catalog的表。對於數據源的開發者來講,何時支Data Source V2 API呢?下面是幾點建議:

不過這裏須要注意,Data Source V2還不是很穩定,開發者可能在將來還須要調整相關API的實現。
大數據的發展至關迅速,Spark3.0爲了能更方便的部署,咱們升級了對各個組件和環境版本的支持,可是要注意如下事項。

關於生態圈,這裏要提一下Koalas,它是一個純的Python庫,用Spark實現了絕大部分的pandas API,讓pandas用戶除了能夠處理小數據,也能夠處理大數據。Koalas對於pandas用戶來講能夠將pandas的代碼擴展到大數據處理,使得學習PySpark變得更簡單;對於現有的PySpark用戶來講,多了更多的選擇,能夠用pandas API來解決生產力問題。過去一年多,Koalas的下載量是驚人的,在pip的下載量單日已經超過了37000,並且還在不斷增加,5月的下載量也達到了85萬。Koalas的代碼其實很少,主要是API的實現,執行仍是由Spark來作,因此Spark性能的提高對於Koalas用戶來講是直接受益的。Koalas的發佈週期當頻密,目前已經有33個發佈,歡迎你們下載使用。

如何讀和理解Spark UI對大多數新用戶來講是一個很大的挑戰,尤爲對SQL用戶來講,在Spark3.0中咱們增長了本身的UI文檔https://spark.apache.org/docs/latest/web-ui.html
而且增長了SQL Reference ,https://spark.apache.org/docs/latest/sql-ref.html
等,更詳細的文檔使得用戶上手Spark的時候更加容易,歡迎你們去試一試Spark3.0,感覺Spark的強大。


關鍵詞:Spark3.0、SQL、PySpark、Koalas、pandas、UDF、AQE

相關內容推薦

自適應查詢執行AQE:在運行時加速SparkSQL

Apache Spark 3.0中的SQL性能改進概覽

Structured Streaming生產化實踐及調優

Apache Spark 3.0對Prometheus監控的原生支持



阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,按期推送精彩案例,技術專家直播,問答區近萬人Spark技術同窗在線提問答疑,只爲營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

對開源大數據和感興趣的同窗能夠加小編微信(下圖二維碼,備註「進羣」)進入技術交流微信羣。

Apache Spark技術交流社區公衆號,微信掃一掃關注


本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索