spark基礎知識彙總

基礎

概述

  • Spark計算平臺有兩個重要角色,Driver和executor
  • Driver
    • Driver充當Application的master角色,負責任務執行計劃生成和任務分發及調度;
    • Driver負責生成邏輯查詢計劃、物理查詢計劃和把任務派發給executor
  • Executor
    • Executor充當worker角色,負責實際執行任務的task,計算的結果返回Driver。
    • Executor接受任務後進行處理,離線計算也是按這個流程進行。

分工

  • RDD Objects生成邏輯查詢計劃
  • 生成物理查詢計劃DAGScheduler
  • 任務調度TaskScheduler
  • 任務執行Executor

做業提交流程

  1. client submit做業,經過反射invoke執行用戶代碼main函數,啓動CoarseGrainedExecutorBackend、初始化SparkContext。
  2. SparkContext初始化包括初始化監控頁面SparkUI、執行環境SparkEnv、安全管理器SecurityManager、stage劃分及調度器DAGScheduler、task做業調度器TaskSchedulerImpl和與Executor通訊的調度端CoarseGrainedSchedulerBackend。
  3. DAGScheduler將做業劃分後,依次提交stage對應的taskSet給TaskSchedulerImpl。
  4. TaskSchedulerImpl會submit taskset給driver端的CoarseGrainedSchedulerBackend後端。
  5. CoarseGrainedSchedulerBackend會逐個LaunchTask
  6. 在遠端的CoarseGrainedExecutorBackend接收到task提交event後,會調用Executor執行task
  7. 最終task是由TaskRunner的run方法內運行。

image

Executor

  • 首先executor端的rpc服務端點收到LaunchTask的消息,並對傳過來的任務數據進行反序列化成TaskDescription將任務交給Executor對象運行
  • Executor根據傳過來的TaskDescription對象建立一個TaskRunner對象,並放到線程池中運行。這裏的線程池用的是Executors.newCachedThreadPool,空閒是不會有線程在跑
  • TaskRunner對任務進一步反序列化,調用Task.run方法執行任務運行邏輯
  • ShuffleMapTask類型的任務會將rdd計算結果數據通過排序合併以後寫到一個文件中,並寫一個索引文件
  • 任務運行完成後會更新一些任務統計量和度量系統中的一些統計量
  • 最後會根據結果序列化後的大小選擇不一樣的方式將結果傳回driver。

共享變量

Broadcast Variable(廣播變量)

  • Broadcast Variable會將使用到的變量,僅僅爲每一個節點拷貝一份,而不是給節點上的每一個task拷貝一份。這樣能夠優化性能,減小網絡傳輸及內存消耗。
  • Broadcast Variable主要用於共享讀,是隻讀的,無法去寫
  • 能夠經過調用SparkContext的broadcast()方法,來針對某個變量建立廣播變量,返回類型是Broadcast 。而後在算子的函數內,使用廣播變量,此時每一個節點都只會拷貝一份,每一個節點可使用廣播變量的value()方法獲取值。廣播變量是隻讀的,不可寫。

Accumulator(累加變量)

Accumulator可讓多個task共同操做一份變量,主要能夠進行累加操做。node

內存管理

  • spark.executor.memory設置executor可用內存,包含
    • reservedMemory :默認300M
    • usableMemory
      • Execution 內存: 存放 Shuffle、Join、Sort、Aggregation 等計算過程當中的臨時數據
      • Storage 內存: 存儲 spark 的 cache 數據,例如RDD的緩存、unroll數據;
      • 用戶內存(User Memory):存儲 RDD 轉換操做所須要的數據,例如 RDD 依賴等信息

相關配置

  • spark.memory.storageFraction:配置usableMemory中Storage內存佔比
  • spark.memory.offHeap.enabled:堆外內存是否開啓,默認不開啓
  • spark.memory.offHeap.size:堆外內存大小

堆內內存

默認狀況下,Spark 僅僅使用了堆內內存。Executor 端的堆內內存區域大體能夠分爲如下四大塊web

  • Execution 內存:主要用於存放 Shuffle、Join、Sort、Aggregation 等計算過程當中的臨時數據
  • Storage 內存:主要用於存儲 spark 的 cache 數據,例如RDD的緩存、unroll數據;
  • 用戶內存(User Memory):主要用於存儲 RDD 轉換操做所須要的數據,例如 RDD 依賴等信息。
  • 預留內存(Reserved Memory):系統預留內存,會用來存儲Spark內部對象。

堆外內存

經過 spark.memory.offHeap.enabled 參數啓用,而且經過 spark.memory.offHeap.size 設置堆外內存大小,單位爲字節。若是堆外內存被啓用,那麼 Executor 內將同時存在堆內和堆外內存,二者的使用互補影響,
堆外內存只區分 Execution 內存和 Storage 內存算法

Execution 內存和 Storage 內存動態調整

  • 若Execution內存與Storage內存都不足時,按照LRU規則存儲到磁盤;
  • 若Execution內存不足,Storage內存有結餘,Storage 內存的空間被佔用後,目前的實現是沒法讓對方"歸還"
  • 若Storage內存不足,Execution內存有結餘,Execution內存的空間被佔用後,可以讓對方將佔用的部分轉存到硬盤,而後"歸還"借用的空間,由於Cache在內存的數據不必定後面會用

Task之間內存分佈

  • Task共享着 Execution 內存
  • Spark 內部維護了一個 HashMap 用於記錄每一個 Task 佔用的內存
  • 每一個Task可用內存爲Executor可用內存的 1/2N ~ 1/N,N是Task的個數

Spark Core

spark的shuffle

  • 前一個stage 的 ShuffleMapTask 進行 shuffle write, 把數據存儲在 blockManager 上面, 而且把數據位置元信息上報到 driver 的 mapOutTrack 組件中, 下一個 stage 根據數據位置元信息, 進行 shuffle read, 拉取上個stage 的輸出數據。

shuffle write

分爲三種writer, 分爲 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWritersql

BypassMergeSortShuffleWriter
  • 開啓map side combine而且分區數較少
  • BypassMergeSortShuffleWriter和Hash Shuffle中的HashShuffleWriter實現基本一致, 惟一的區別在於,map端的多個輸出文件會被彙總爲一個文件。 全部分區的數據會合併爲同一個文件,會生成一個索引文件,是爲了索引到每一個分區的起始地址,能夠隨機 access 某個partition的全部數據。
  • 這種方式不宜有太多分區,由於過程當中會併發打開全部分區對應的臨時文件,會對文件系統形成很大的壓力。
  • 給每一個分區分配一個臨時文件,對每一個 record的key 使用分區器(模式是hash,若是用戶自定義就使用自定義的分區器)找到對應分區的輸出文件句柄,直接寫入文件,沒有在內存中使用 buffer。 最後copyStream方法把全部的臨時分區文件拷貝到最終的輸出文件中,而且記錄每一個分區的文件起始寫入位置,把這些位置數據寫入索引文件中。
SortShuffleWriter
  • 使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在內存中進行排序, 排序的 K 是(partitionId, hash(key)) 這樣一個元組。
  • 若是超過內存 limit, 我 spill 到一個文件中,這個文件中元素也是有序的,首先是按照 partitionId的排序,若是 partitionId 相同, 再根據 hash(key)進行比較排序
  • 若是須要輸出全局有序的文件的時候,就須要對以前全部的輸出文件 和 當前內存中的數據結構中的數據進行 merge sort, 進行全局排序
UnsafeShuffleWriter
  • UnsafeShuffleWriter須要Serializer支持relocation
  • UnsafeShuffleWriter 裏面維護着一個 ShuffleExternalSorter, 用來作外部排序
區別 UnsafeShuffleWriter SortShuffleWriter
排序方式 最終只是 partition 級別的排序 先 partition 排序,相同分區 key有序
aggregation 沒有飯序列化,沒有aggregation 支持 aggregation

shuffle read

內存管理——Tungsten

  • TaskMemoryManager用來統一這兩種內存:堆內內存和堆外內存
  • MemoryBlock 繼承 MemoryLocation 表明着對內存的定位,這個對象能夠把off-heap 和on-heap 進行統一, MemoryLocation 對於off-heap的memory,obj爲null,offset則爲絕對的內存地址,對於on-heap的memory,obj則是JVM對象的基地址,offset則是相對於改對象基地址的偏移。

Spark SQL

Parser模塊

  • 將sparkSql字符串切分紅一個一個token,再根據必定語義規則解析爲一個抽象語法樹/AST。Parser模塊目前基本都使用第三方類庫ANTLR來實現,好比Hive,presto,sparkSql等。
  • SqlBaseLexer和SqlBaseParser都是使用ANTLR4自動生成的Java類。使用這兩個解析器將SQL字符串語句解析成了ANTLR4的ParseTree語法樹結構。而後在parsePlan過程當中,使用AstBuilder.scala將ParseTree轉換成catalyst表達式邏輯計劃LogicalPlan。

Analyzer模塊

  • 基本的元數據信息schema catalog來表達這些token。最重要的元數據信息就是,
    • 表的schema信息,主要包括表的基本定義(表名、列名、數據類型)、表的數據格式(json、text、parquet、壓縮格式等)、表的物理位置
    • 基本函數信息,主要是指類信息
  • Analyzer會再次遍歷整個AST,對樹上的每一個節點進行數據類型綁定以及函數綁定,好比people詞素會根據元數據表信息解析爲包含age、id以及name三列的表,people.age會被解析爲數據類型爲int的變量,sum會被解析爲特定的聚合函數,

Optimizer模塊

  • Optimizer是catalyst的核心,分爲RBO和CBO兩種。
  • RBO的優化策略就是對語法樹進行一次遍歷,模式匹配可以知足特定規則的節點,再進行相應的等價轉換,即將一棵樹等價地轉換爲另外一棵樹。SQL中經典的常見優化規則有,
    +謂詞下推(predicate pushdown)
    +常量累加(constant folding)
    +列值裁剪(column pruning)
    • Limits合併(combine limits)

SparkPlanner模塊

  • 至此,OLP已經獲得了比較完善的優化,然而此時OLP依然沒有辦法真正執行,它們只是邏輯上可行,實際上spark並不知道如何去執行這個OLP。
  • 此時就須要將左邊的OLP轉換爲physical plan物理執行計劃,將邏輯上可行的執行計劃變爲spark能夠真正執行的計劃。
  • 好比join算子,spark根據不一樣場景爲該算子制定了不一樣的算法策略,有broadcastHashJoin、shuffleHashJoin以及sortMergeJoin,物理執行計劃實際上就是在這些具體實現中挑選一個耗時最小的算法實現,這個過程涉及到cost model/CBO

WholeStageCodegen

  • WholeStageCodegen,將多個operators合併成一個java函數,從而提升執行速度

Spark MLLib

Spark Streaming

  • 數據流通過Spark Streaming的receiver,數據切分爲DStream(相似RDD,DStream是Spark Streaming中流數據的邏輯抽象),而後DStream被Spark Core的離線計算引擎執行並行處理。

流程

  • 實時計算與離線計算同樣,主要組件是Driver和Executor的。不一樣的是多了數據採集和數據按時間分片過程,數據採集依賴外部數據源,這裏用MessageQueue表示
  • 數據分片則依靠內部一個時鐘Clock,按batch interval來定時對數據分片,而後把每個batch interval內的數據提交處理。
  • Executor從MessageQueue獲取數據並交給BlockManager管理,而後把元數據信息BlockID返給driver的Receiver Tracker
  • driver端的Job Jenerator對一個batch的數據生成JobSet,最後把做業執行計劃傳遞給executor處理。

image

Structure Streaming

將數據抽象爲DataFrame,即無邊界的表,經過將數據源映射爲一張無界長度的表,經過表的計算,輸出結果映射爲另外一張表。這樣以結構化的方式去操做流式數據,簡化了實時計算過程,同時還複用了其Catalyst引擎來優化SQL操做。此外還能支持增量計算和基於event time的計算。json

Spark thrift server優化

  • spark常駐Driver
  • 增長用戶概念,application有用戶歸屬

實現流程

  • spark thrift server基於hive jdbc服務實現

特性

  • 支持手動緩存中間結果集,Statement級別複用
  • 支持提交離線做業、監控離線做業、清理離線做業
    • 增長SparkListener監控job狀態,而後取數據
    • 調用spark restful接口 kill job

ES-Spark優化

Spark DirectQuery ES

  • 原生API會解析json,拼接到query中,不靈活
    • 例如query+highlight
    • query + sort
    • 等等
  • Spark-SQL 的 where 語句所有(或者部分)下沉到 ES裏進行執行,依賴於倒排索引,DocValues,以及分片,並行化執行,ES可以得到比Spark-SQL更優秀的響應時間
  • 分片數據Merge(Reduce操做,Spark 能夠得到更好的性能和分佈式能力),更復雜的業務邏輯都交給Spark-SQL (此時數據規模已經小很是多了),而且能夠作各類自定義擴展,經過udf等函數
  • ES 無需實現Merge操做,能夠減輕內存負擔,提高並行Merge的效率(而且現階段彷佛ES的Reduce是隻能在單個實例裏完成)

實現流程

## package.scala
+ 增長接口

def esDirectRDD(resource: String, cfg: scala.collection.Map[String, String]) = EsSpark.esDirectRDD(sc, resource, cfg)後端

## EsSpark
+ 增長接口

def esDirectRDD(sc: SparkContext, resource: String, cfg: Map[String, String]): RDD[(String, Map[String, AnyRef])] =
new ScalaEsRDDMap[String, AnyRef]數組

## ScalaESDirectRDD
+ 繼承AbstractDirectEsRDD


## AbstractDirectEsRDD
+ 抽象ESRDD類

## AbstractEsDirectRDDIterator
+ 對應AbstractEsRDDIterator
+ 處理直連查詢

## PartitionDirectReader
+ 處理director查詢

## SearchRequestBuilder
+ 增長buildDirect方法

## DirectQuery
+ 與ScrollQuery一致
+ 包含DirectReader的引用,處理結果集
+ 調用RestRepository,處理結果集


## RestRepository
### direrctQuery
+ 發起Restful請求,並使用傳入的Reader處理返回的結果;
+ 原始scroll接口,處理scroll查詢
+ 增長directQuery接口,處理direrctQuery查詢

### scanDirect
+ 直接建立DirectQuery


## DirectSearchRequestBuilder
+ 構建查詢;

## DirectReader
+ 對應ScrollReader
+ 處理結果集請求
+ 從Hits開始解析

## SimpleQueryParser
+ 增長parseESQuery

agg支持

  • 實現流程相似,但執行節點爲節點級別,而不是shard級別
  • 數據結果解析,對agg別名設定規則
  • 結果解析爲同一個partition/各個shard對應一個partition,須要二次處理
  • 後續:設定參數,進行shard級別聚合,並對聚合結果改寫、合併

spark MLLib

源碼

thrift server

ES-Hadoop

調優

動態資源分配

  • spark.dynamicAllocation.enabled:該配置項用於配置是否使用動態資源分配,根據工做負載調整應用程序註冊的executor的數量。默認爲false(至少在spark2.2-spark2.4中如此),在CDH發行版中默認爲true,
  • 若是啓用動態分配,在executor空閒spark.dynamicAllocation.executorIdleTimeout(默認60s)以後將被釋放。

動態資源分配策略

開啓動態分配策略後,application會在task因沒有足夠資源被掛起的時候去動態申請資源,這種狀況意味着該application現有的executor沒法知足全部task並行運行。spark一輪一輪的申請資源,當有task掛起或等待spark.dynamicAllocation.schedulerBacklogTimeout(默認1s)時間的時候,會開始動態資源分配;以後會每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默認1s)時間申請一次,直到申請到足夠的資源。每次申請的資源量是指數增加的,即1,2,4,8等。
之因此採用指數增加,出於兩方面考慮:其一,開始申請的少是考慮到可能application會立刻獲得知足;其次要成倍增長,是爲了防止application須要不少資源,而該方式能夠在不多次數的申請以後獲得知足。緩存

資源回收策略

當application的executor空閒時間超過spark.dynamicAllocation.executorIdleTimeout(默認60s)後,就會被回收。安全

內存佔用

調優內存的使用主要有三個方面的考慮:對象的內存佔用量(你可能但願整個數據集都適合內存),訪問這些數據的開銷,垃圾回收的負載。

默認狀況下,java的對象是能夠快速訪問的,可是相比於內部的原始數據消耗估計2-5倍的空間。主要歸於下面三個緣由:
1),每一個不一樣的Java對象都有一個「對象頭」,它大約是16個字節,包含一個指向它的類的指針。對於一個數據不多的對象(好比一個Int字段),這能夠比數據大。
2),Java字符串在原始字符串數據上具備大約40字節的開銷(由於它們將它們存儲在一個Chars數組中,並保留額外的數據,例如長度),而且因爲String的內部使用UTF-16編碼而將每一個字符存儲爲兩個字節。所以,一個10個字符的字符串能夠容易地消耗60個字節。
3),經常使用集合類(如HashMap和LinkedList)使用連接的數據結構,其中每一個條目都有一個「包裝器」對象(例如Map.Entry)。該對象不只具備頭部,還包括指針(一般爲8個字節)到列表中的下一個對象。
4),原始類型的集合一般將它們存儲爲「boxed」對象,如java.lang.Integer。

肯定內存的消耗

最好的方式去計算一個數據的的內存消耗,就是建立一個RDD,而後加入cache,這樣就能夠在web ui中Storage頁面看到了。頁面會告訴你,這個RDD消耗了多少內存。
要估計特定對象的內存消耗,請使用SizeEstimator的估計方法。這對於嘗試使用不一樣的數據佈局來修剪內存使用狀況以及肯定廣播變量在每一個執行程序堆中佔用的空間量很是有用。

調優數據結構

減小內存消耗的第一種方法是避免使用增長負擔的java特性,例如基於指針的數據結構和包裝對象。下面幾種方法能夠來避免這個。

  • 1,將數據結構設計爲偏好對象數組和原始類型,而不是標準的Java或Scala集合類(例如HashMap)。fastutil庫(http://fastutil.di.unimi.it/)爲與Java標準庫兼容的原始類型提供方便的集合類
  • 2,儘量避免使用有不少小對象和指針的嵌套結構。
  • 3,針對關鍵詞,考慮使用數字ID或者枚舉對象而不是字符串。
  • 4,若是您的RAM少於32 GB,請設置JVM標誌-XX:+ UseCompressedOops使指針爲四個字節而不是八個字節。您能夠在spark-env.sh中添加這些選項。

序列化RDD

儘管進行了調優,當您的對象仍然太大而沒法有效存儲時,一個簡單的方法來減小內存使用是使用RDD持久性API中的序列化StorageLevel(如MEMORY_ONLY_SER)以序列化形式存儲它們。Spark將會將每一個RDD分區存儲爲一個大字節數組。以序列化形式存儲數據的惟一缺點是數據訪問變慢,由於必須對每一個對象進行反序列化。若是您想以序列化形式緩存數據,咱們強烈建議使用Kryo,由於它會使數據比java序列化後的大小更小(並且確定比原Java對象更小)。

垃圾回收調優

  • 垃圾收集的成本與Java對象的數量成正比,所以使用較少對象的數據結構(例如,Ints數組,代替LinkedList)將大大下降了成本。一個更好的方法是以序列化形式持久化對象,如上所述:每一個RDD分區將只有一個對象(一個字節數組)。在嘗試其餘技術以前,若是GC是一個問題,首先要嘗試的是使用序列化緩存。

因爲任務的運行內存和RDD的緩存內存的干擾,GC也會是一個問題。

測量GC的影響

GC調優的第一步是收集關於垃圾收集發生頻率和GC花費的時間的統計信息。經過將-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStamps添加到Java選項來完成。下次運行Spark做業時,每當垃圾收集發生時,都會看到在工做日誌中打印的消息。請注意,這些日誌將在您的羣集的Executor節點上(在其工做目錄中的stdout文件中),而不是您的driver功能中。

高級GC調優

Spark應用程序GC調優的目標是,確保生命週期比較長的RDD保存在老年代,新生代有足夠的空間保存生命週期比較短的對象。這有助於避免觸發Full GC去收集task運行期間產生的臨時變量。下面列舉幾個有用的步驟:

  • 1),經過收集垃圾回收信息,判斷是否有太多的垃圾回收過程。假如full gc在一個task完成以前觸發了好幾回,那說明運行task的內存空間不足,須要加內存。
  • 2),在gc的統計信息中,若是老年代接近滿了,減小用於緩存的內存(經過減少spark.memory.Fraction)。緩存較少的對象比下降運行速度對咱們來講更有好處。另外,能夠考慮減小年輕代。能夠經過減少-Xmn參數設置的值,假如使用的話。假如沒有設置能夠修改JVM的NewRation參數。大多數JVMs默認值是2,意思是老年代佔用了三分之二的總內存。這個值要足夠大,至關於擴展了spark.memory.fraction.
  • 3),若是有太多的minor gc,較少的major gc,增長Eden區內存會有幫助。將Eden區內存設置的比task運行估計內存稍微大一些。若是Eden區大小肯定爲E,那就將新生代的內存設置爲-Xmn=4/3E,按比例增長內存是考慮到survivor區所佔用的內存。
  • 4),嘗試經過設置-XX:+UseG1GC垃圾回收器爲G1。在垃圾回收器是瓶頸的一些狀況下,它能夠提升性能。請注意,對於大的Executor堆,經過使用-XX:G!HeapRegionSize去增大G1的堆大小,顯得尤其重要。
  • 5),例如,若是您的任務是從HDFS讀取數據,則可使用從HDFS讀取的數據塊的大小來估計任務使用的內存量。請注意,解壓縮塊的大小一般是塊大小的2或3倍。因此若是咱們但願有3或4個任務的工做空間,HDFS塊的大小是64 MB,咱們能夠估計Eden的大小是4 * 3 * 64MB。
  • 6),監控垃圾收集的頻率和時間如何隨着新設置的變化而變化。

經驗代表,GC調整的效果取決於您的應用程序和可用的內存量。

並行度

併發不足會致使集羣浪費。

  • Spark自動會根據文件的大小,是否可分割等因素來設置map的數目;
  • 對於分佈式reduce操做,例如groupbykey和reducebykey,reduce默認數量是分區數最大的父RDD的分區數;
  • 你也能夠經過設置spark.default.parallelism來改變默認值,建議值是每一個CPU執行2-3個tasks。

Reduce任務的內存使用

  • 內存溢出並必定是RDD不適合放在內存裏面,可能由於task的數據集太大了。
  • Spark的shuffle操做(sortByKey, groupByKey, reduceByKey, join, etc)會構建一個hash表,每一個task執行一個分組的數據,單個每每會很大。
  • 最簡單的改善方法是增長並行度,讓每一個task的輸入變得更小。
  • Spark能夠高效的支持短達200ms的任務,由於複用了Executor的JVM,這能夠下降啓動成本,因此你能夠很安全的增長並行度,使其超過你的集羣core數目。

廣播變量

  • spark的廣播功能能夠大幅度減小每一個序列化後的task的大小,也能夠減小在集羣中執行一個job的代價。
  • 若是你的任務中使用了大的對象,好比靜態表,能夠考慮將它聲明成廣播變量。
  • 在driver節點,spark會打印出每一個task序列化後的大小,因此你能夠經過查看task的大小判斷你的task是否過大,一般task的大小超過20KB就值得調優了。

數據本地化

  • 數據的本地性可能會對Spark jobs產生重大影響。若是數據和在其上操做的代碼在一塊兒,則計算每每是快速的。但若是代碼和數據分開,則必需要有一方進行移動。典型的狀況是將序列化後的代碼移動到數據所在的地方,由於數據每每比代碼大不少。Spark構建調度計劃的原則就是數據本地性。
  • 根據數據和代碼當前的位置,數據本地性等級。
  • Spark傾向於調度任務依據最高的數據本地性,但這每每是不可能的。在任何空閒的Executor上沒有未處理數據的狀況下,Spark會切換到較低的數據本地性。這種狀況下會有兩個選擇:
    • 1),等待CPU空閒,而後在相同的server上啓動task。
    • 2),當即在一個須要遷移數據的較遠位置啓動一個新的task。
  • Spark的典型處理策略是等待繁忙CPU釋放,時間很短。一旦超時,將移動數據到空閒CPU的地方執行任務。每一個級別之間的回退等待超時能夠在一個參數中單獨配置或所有配置。若是任務較長,且數據本地性較差,能夠適當調整Spark.locatity超時時間相關的配置。

從最近到最遠的順序列出以下:

PROCESS_LOCAL

數據和代碼在同一個JVM中,這是最佳的數據本地性。

NODE_LOCAL

數據和代碼在相同的節點。好比數據在同一節點的HDFS上,或者在統一節點的Executor上。因爲數據要在多個進程間移動,因此比PROCESS_LOCAL稍慢。

NO_PREF

數據能夠從任何地方快速訪問,沒有數據本地性。

RACK_LOCAL

數據和代碼在相同的機架。數據位於同一機架上的不一樣服務器上,所以須要經過網絡發送,一般經過單個交換機發送

ANY

數據在網絡上的其餘地方,而不在同一個機架中。

數據傾斜

數據源

  • 儘可能使用可切分的格式代替不可切分的格式,或者保證各文件實際包含數據量大體相同。

調整並行度分散同一個Task的不一樣Key

Spark在作Shuffle時,默認使用HashPartitioner(非Hash Shuffle)對數據進行分區。若是並行度設置的不合適,可能形成大量不相同的Key對應的數據被分配到了同一個Task上,形成該Task所處理的數據遠大於其它Task,從而形成數據傾斜。

若是調整Shuffle時的並行度,使得本來被分配到同一Task的不一樣Key發配到不一樣Task上處理,則可下降原Task所需處理的數據量,從而緩解數據傾斜問題形成的短板效應。

適用場景

大量不一樣的Key被分配到了相同的Task形成該Task數據量過大。

自定義Partitioner

使用自定義的Partitioner(默認爲HashPartitioner),將本來被分配到同一個Task的不一樣Key分配到不一樣Task。

適用場景

大量不一樣的Key被分配到了相同的Task形成該Task數據量過大。

將Reduce side Join轉變爲Map side Join

正確的使用Broadcast實現Map側Join的方式是,經過SET spark.sql.autoBroadcastJoinThreshold=104857600;將Broadcast的閾值設置得足夠大。

適用場景

參與Join的一邊數據集足夠小,可被加載進Driver並經過Broadcast方法廣播到各個Executor中。

爲skew的key增長隨機前/後綴

爲數據量特別大的Key增長隨機前/後綴,使得原來Key相同的數據變爲Key不相同的數據,從而使傾斜的數據集分散到不一樣的Task中,完全解決數據傾斜問題。Join另外一則的數據中,與傾斜Key對應的部分數據,與隨機前綴集做笛卡爾乘積,從而保證不管數據傾斜側傾斜Key如何加前綴,都能與之正常Join。

步驟

現經過以下操做,實現傾斜Key的分散處理

  • 將leftRDD中傾斜的key(即9500048與9500096)對應的數據單獨過濾出來,且加上1到24的隨機前綴,並將前綴與原數據用逗號分隔(以方便以後去掉前綴)造成單獨的leftSkewRDD
  • 將rightRDD中傾斜key對應的數據抽取出來,並經過flatMap操做將該數據集中每條數據均轉換爲24條數據(每條分別加上1到24的隨機前綴),造成單獨的rightSkewRDD
  • 將leftSkewRDD與rightSkewRDD進行Join,並將並行度設置爲48,且在Join過程當中將隨機前綴去掉,獲得傾斜數據集的Join結果skewedJoinRDD
  • 將leftRDD中不包含傾斜Key的數據抽取出來做爲單獨的leftUnSkewRDD
  • 對leftUnSkewRDD與原始的rightRDD進行Join,並行度也設置爲48,獲得Join結果unskewedJoinRDD
  • 經過union算子將skewedJoinRDD與unskewedJoinRDD進行合併,從而獲得完整的Join結果集

適用場景

兩張表都比較大,沒法使用Map則Join。其中一個RDD有少數幾個Key的數據量過大,另一個RDD的Key分佈較爲均勻。

大表隨機添加N種隨機前綴,小表擴大N倍

若是出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集所有加上隨機前綴,而後對另一個不存在嚴重數據傾斜的數據集總體與隨機前綴集做笛卡爾乘積(即將數據量擴大N倍)。

適用場景

一個數據集存在的傾斜Key比較多,另一個數據集數據分佈比較均勻。

Adaptive Execution

http://www.jasongj.com/spark/adaptive_execution/

參考

相關文章
相關標籤/搜索