Bucketing 就是利用 buckets(按列進行分桶)來決定數據分區(partition)的一種優化技術,它能夠幫助在計算中避免數據交換(avoid data shuffle)。並行計算的時候shuffle經常會耗費很是多的時間和資源.html
Bucketing 的基本原理比較好理解,它會根據你指定的列(能夠是一個也能夠是多個)計算哈希值,而後具備相同哈希值的數據將會被分到相同的分區。sql
Bucket的最終目的也是實現分區,可是和Partition的原理不一樣,當咱們根據指定列進行Partition的時候,Spark會根據列的名字對數據進行分區(若是沒有指定列名則會根據一個隨機信息對數據進行分區)。Bucketing的最大不一樣在於它使用了指定列的哈希值,這樣能夠保證具備相同列值的數據被分到相同的分區。函數
目前在使用 bucketBy 的時候,必須和 sortBy,saveAsTable 一塊兒使用,以下。這個操做實際上是將數據保存到了文件中(若是不指定path,也會保存到一個臨時目錄中)。post
df.write .bucketBy(10, "name") .sortBy("name") .mode(SaveMode.Overwrite) .option("path","/path/to") .saveAsTable("bucketed")
數據分桶保存以後,咱們才能使用它。測試
在一個SparkSession內,保存以後你能夠經過以下命令經過表名獲取其對應的DataFrame.大數據
val df = spark.table("bucketed")
其中spark是一個SparkSession對象。獲取以後就可使用DataFrame或者在SQL中使用表。優化
若是你要使用歷史保存的數據,那麼就不能用上述方法了,也不能像讀取常規文件同樣使用 spark.read.parquet() ,這種方式讀進來的數據是不帶bucket信息的。正確的方法是利用CREATE TABLE 語句,詳情可用參考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.htmlspa
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name1 col_type1 [COMMENT col_comment1], ...)] USING data_source [OPTIONS (key1=val1, key2=val2, ...)] [PARTITIONED BY (col_name1, col_name2, ...)] [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS] [LOCATION path] [COMMENT table_comment] [TBLPROPERTIES (key1=val1, key2=val2, ...)] [AS select_statement]
示例以下:scala
spark.sql( """ |CREATE TABLE bucketed | (name string) | USING PARQUET | CLUSTERED BY (name) INTO 10 BUCKETS | LOCATION '/path/to' |""".stripMargin)
在咱們join兩個表的時候,若是兩個表最好按照相同的列劃分紅相同的buckets,就能夠徹底避免shuffle。根據前面所述的hash值計算方法,兩個表具備相同列值的數據會存放在相同的機器上,這樣在進行join操做時就不須要再去和其餘機器通信,直接在本地完成計算便可。假設你有左右兩個表,各有兩個分區,那麼join的時候實際計算就是下圖的樣子,兩個機器進行計算,而且計算後分區仍是2.code
而當須要shuffle的時候,會是這樣的,
細心的你可能發現了,上面兩個分區對應兩個Executor,下面shuffle以後對應的怎麼成了三個Executor了?沒錯,當數據進行shuffle以後,分區數就再也不保持和輸入的數據相同了,實際上也沒有必要保持相同。
咱們考慮的是大數據表的鏈接,本地測試的時候通常使用小的表,因此逆序須要將小表自動廣播的配置關掉。若是開啓小表廣播,那麼兩個小表的join以後分區數是不會變的,例如:
左表分區數 | 右表分區數數 | Join以後的分區數 |
---|---|---|
3 | 3 | 3 |
關閉配置的命令以下:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
正常狀況下join以後分區數會發生變化:
左表分區數 | 右表分區數數 | Join以後的分區數 |
---|---|---|
3 | 3 | 200 |
這個200其實就是 "spark.sql.shuffle.partitions" 配置的值,默認就是200. 因此若是在Join過程當中出現了shuffle,join以後的分區必定會變,而且變成spark.sql.shuffle.partitions的值。一般你須要根據本身的集羣資源修改這個值,從而優化並行度,可是shuffle是不可避免的。
實際測試結果以下:
左表Bucket數 | 右表Bucekt數 | Join以後的分區數 |
---|---|---|
8 | 4 | 8 |
4 | 4 | 4 |
Spark依然會利用一些Bucekt的信息,但具體怎麼執行目前還不太清楚,仍是保持一致的好。
另外,若是你spark job的可用計算核心數小於Bucket值,那麼從文件中讀取以後Bucekt值會變,就是說bucket的數目不會超過你能使用的最大計算核數。
在處理null值的時候,咱們可能會用到一些特殊的函數或者符號,以下表所示。可是在使用bucket的時候這裏有個坑,必定要躲過。join的時候千萬不要使用 <=> 符號,使用以後spark就會忽略bucket信息,繼續shuffle數據,緣由可能和hash計算有關。
若是你喜歡個人文章,能夠在任一平臺搜索【黑客悟理】關注我,很是感謝!