Apache Spark Jobs 性能調優

當你開始編寫 Apache Spark 代碼或者瀏覽公開的 API 的時候,你會遇到各類各樣術語,好比transformationactionRDD(resilient distributed dataset) 等等。 瞭解到這些是編寫 Spark 代碼的基礎。 一樣,當你任務開始失敗或者你須要透過web界面去了解本身的應用爲什麼如此費時的時候,你須要去了解一些新的名詞: jobstagetask。對於這些新術語的理解有助於編寫良好 Spark 代碼。這裏的良好主要指更快的 Spark 程序。對於 Spark 底層的執行模型的瞭解對於寫出效率更高的 Spark 程序很是有幫助。html

Spark 是如何執行程序的

一個 Spark 應用包括一個 driver 進程和若干個分佈在集羣的各個節點上的 executor 進程。node

driver 主要負責調度一些高層次的任務流(flow of work)。exectuor 負責執行這些任務,這些任務以 task 的形式存在, 同時存儲用戶設置須要caching的數據。 task 和全部的 executor 的生命週期爲整個程序的運行過程(若是使用了dynamic resource allocation 時可能不是這樣的)。如何調度這些進程是經過集羣管理應用完成的(好比YARN,Mesos,Spark Standalone),可是任何一個 Spark 程序都會包含一個 driver 和多個 executor 進程。程序員

在執行層次結構的最上方是一系列 Job。調用一個Spark內部的 action 會產生一個 Spark job 來完成它。 爲了肯定這些job實際的內容,Spark 檢查 RDD 的DAG再計算出執行 plan 。這個 plan 以最遠端的 RDD 爲起點(最遠端指的是對外沒有依賴的 RDD 或者 數據已經緩存下來的 RDD),產生結果 RDD 的 action 爲結束 。web

執行的 plan 由一系列 stage 組成,stage 是 job 的 transformation 的組合,stage 對應於一系列 task, task 指的對於不一樣的數據集執行的相同代碼。每一個 stage 包含不須要 shuffle 數據的 transformation 的序列。shell

什麼決定數據是否須要 shuffle ?RDD 包含固定數目的 partition, 每一個 partiton 包含若干的 record。對於那些經過narrow tansformation(好比 map 和 filter)返回的 RDD,一個 partition 中的 record 只須要從父 RDD 對應的partition 中的 record 計算獲得。每一個對象只依賴於父 RDD 的一個對象。有些操做(好比 coalesce)可能致使一個 task處理多個輸入 partition ,可是這種 transformation 仍然被認爲是 narrow 的,由於用於計算的多個輸入 record 始終是來自有限個數的 partitionapache

然而 Spark 也支持須要 wide 依賴的 transformation,好比 groupByKey,reduceByKey。在這種依賴中,計算獲得一個 partition 中的數據須要從父 RDD 中的多個 partition 中讀取數據。全部擁有相同 key 的元組最終會被聚合到同一個partition 中,被同一個 stage 處理。爲了完成這種操做, Spark須要對數據進行 shuffle,意味着數據須要在集羣內傳遞,最終生成由新的 partition 集合組成的新的 stageapi

舉例,如下的代碼中,只有一個 action 以及 從一個文本串下來的一系列 RDD, 這些代碼就只有一個 stage,由於沒有哪一個操做須要從不一樣的 partition 裏面讀取數據。緩存

sc.textFile("someFile.txt").
  map(mapFunc).
  flatMap(flatMapFunc).
  filter(filterFunc).
  count()

跟上面的代碼不一樣,下面一段代碼須要統計總共出現超過1000次的單詞:網絡

val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).
  reduceByKey(_ + _)
charCounts.collect()

這段代碼能夠分紅三個 stage。recudeByKey 操做是各 stage 之間的分界,由於計算 recudeByKey 的輸出須要按照能夠從新分配 partition數據結構

這裏還有一個更加複雜的 transfromation 圖,包含一個有多路依賴的 join transformation

粉紅色的框框展現了運行時使用的 stage 圖。

運行到每一個 stage 的邊界時,數據在父 stage 中按照 task 寫到磁盤上,而在子 stage 中經過網絡按照 task 去讀取數據。這些操做會致使很重的網絡以及磁盤的I/O,因此 stage 的邊界是很是佔資源的,在編寫 Spark 程序的時候須要儘可能避免的。父 stage 中 partition 個數與子 stage 的 partition 個數可能不一樣,因此那些產生 stage 邊界的 transformation 經常須要接受一個 numPartition 的參數來以爲子 stage 中的數據將被切分爲多少個 partition

正如在調試 MapReduce 是選擇 reducor 的個數是一項很是重要的參數,調整在 stage 邊屆時的 partition 個數常常能夠很大程度上影響程序的執行效率。咱們會在後面的章節中討論如何調整這些值。

選擇正確的 Operator

當須要使用 Spark 完成某項功能時,程序員須要從不一樣的 action 和 transformation 中選擇不一樣的方案以得到相同的結果。可是不一樣的方案,最後執行的效率可能有云泥之別。迴避常見的陷阱選擇正確的方案可使得最後的表現有巨大的不一樣。一些規則和深刻的理解能夠幫助你作出更好的選擇。

在最新的 Spark5097 文檔中開始穩定 SchemaRDD(也就是 Spark 1.3 開始支持的DataFrame),這將爲使用 Spark 核心API的程序員打開 Spark的 Catalyst optimizer,容許 Spark 在使用 Operator 時作出更加高級的選擇。當 SchemaRDD穩定以後,某些決定將不須要用戶去考慮了。

選擇 Operator 方案的主要目標是減小 shuffle 的次數以及被 shuffle 的文件的大小。由於 shuffle 是最耗資源的操做,因此有 shuffle 的數據都須要寫到磁盤而且經過網絡傳遞。repartition,join,cogroup,以及任何 *By 或者 *ByKey 的transformation 都須要 shuffle 數據。不是全部這些 Operator 都是平等的,可是有些常見的性能陷阱是須要注意的。

  • 當進行聯合的規約操做時,避免使用 groupByKey。舉個例子,rdd.groupByKey().mapValues(_ .sum) 與 rdd.reduceByKey(_ + _) 執行的結果是同樣的,可是前者須要把所有的數據經過網絡傳遞一遍,然後者只須要根據每一個key 局部的 partition 累積結果,在 shuffle 的以後把局部的累積值相加後獲得結果。
  • 當輸入和輸入的類型不一致時,避免使用 reduceByKey。舉個例子,咱們須要實現爲每個key查找全部不相同的 string。一個方法是利用 map 把每一個元素的轉換成一個 Set,再使用 reduceByKey 將這些 Set 合併起來
rdd.map(kv => (kv._1, new Set[String]() + kv._2))
    .reduceByKey(_ ++ _)

這段代碼生成了無數的非必須的對象,由於每一個須要爲每一個 record 新建一個 Set。這裏使用 aggregateByKey 更加適合,由於這個操做是在 map 階段作聚合。

val zero = new collection.mutable.Set[String]()
rdd.aggregateByKey(zero)(
    (set, v) => set += v,
    (set1, set2) => set1 ++= set2)
  • 避免 flatMap-join-groupBy 的模式。當有兩個已經按照key分組的數據集,你但願將兩個數據集合並,而且保持分組,這種狀況可使用 cogroup。這樣能夠避免對group進行打包解包的開銷。

何時不發生 Shuffle

固然瞭解在哪些 transformation 上不會發生 shuffle 也是很是重要的。當前一個 transformation 已經用相同的patitioner 把數據分 patition 了,Spark知道如何避免 shuffle。參考一下代碼:

rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)

由於沒有 partitioner 傳遞給 reduceByKey,因此係統使用默認的 partitioner,因此 rdd1 和 rdd2 都會使用 hash 進行分 partition。代碼中的兩個 reduceByKey 會發生兩次 shuffle 。若是 RDD 包含相同個數的 partition, join 的時候將不會發生額外的 shuffle。由於這裏的 RDD 使用相同的 hash 方式進行 partition,因此所有 RDD 中同一個 partition 中的 key的集合都是相同的。所以,rdd3中一個 partiton 的輸出只依賴rdd2和rdd1的同一個對應的 partition,因此第三次shuffle 是沒必要要的。

舉個例子說,當 someRdd 有4個 partition, someOtherRdd 有兩個 partition,兩個 reduceByKey 都使用3個partiton,全部的 task 會按照以下的方式執行:

若是 rdd1 和 rdd2 在 reduceByKey 時使用不一樣的 partitioner 或者使用相同的 partitioner 可是 partition 的個數不一樣的狀況,那麼只有一個 RDD (partiton 數更少的那個)須要從新 shuffle

相同的 tansformation,相同的輸入,不一樣的 partition 個數:

當兩個數據集須要 join 的時候,避免 shuffle 的一個方法是使用 broadcast variables。若是一個數據集小到可以塞進一個executor 的內存中,那麼它就能夠在 driver 中寫入到一個 hash table中,而後 broadcast 到全部的 executor 中。而後map transformation 能夠引用這個 hash table 做查詢。

什麼狀況下 Shuffle 越多越好

儘量減小 shuffle 的準則也有例外的場合。若是額外的 shuffle 可以增長併發那麼這也可以提升性能。好比當你的數據保存在幾個沒有切分過的大文件中時,那麼使用 InputFormat 產生分 partition 可能會致使每一個 partiton 中彙集了大量的record,若是 partition 不夠,致使沒有啓動足夠的併發。在這種狀況下,咱們須要在數據載入以後使用 repartiton (會致使shuffle)提升 partiton 的個數,這樣可以充分使用集羣的CPU。

另一種例外狀況是在使用 recude 或者 aggregate action 彙集數據到 driver 時,若是數據把不少 partititon 個數的數據,單進程執行的 driver merge 全部 partition 的輸出時很容易成爲計算的瓶頸。爲了緩解 driver 的計算壓力,可使用reduceByKey 或者 aggregateByKey 執行分佈式的 aggregate 操做把數據分佈到更少的 partition 上。每一個 partition中的數據並行的進行 merge,再把 merge 的結果發個 driver 以進行最後一輪 aggregation。查看 treeReduce 和treeAggregate 查看如何這麼使用的例子。

這個技巧在已經按照 Key 彙集的數據集上格外有效,好比當一個應用是須要統計一個語料庫中每一個單詞出現的次數,而且把結果輸出到一個map中。一個實現的方式是使用 aggregation,在每一個 partition 中本地計算一個 map,而後在 driver中把各個 partition 中計算的 map merge 起來。另外一種方式是經過 aggregateByKey 把 merge 的操做分佈到各個partiton 中計算,而後在簡單地經過 collectAsMap 把結果輸出到 driver 中。

二次排序

還有一個重要的技能是瞭解接口 repartitionAndSortWithinPartitions transformation。這是一個聽起來很晦澀的transformation,可是卻能涵蓋各類奇怪狀況下的排序,這個 transformation 把排序推遲到 shuffle 操做中,這使大量的數據有效的輸出,排序操做能夠和其餘操做合併。

舉例說,Apache Hive on Spark 在join的實現中,使用了這個 transformation 。並且這個操做在 secondary sort 模式中扮演着相當重要的角色。secondary sort 模式是指用戶指望數據按照 key 分組,而且但願按照特定的順序遍歷 value。使用 repartitionAndSortWithinPartitions 再加上一部分用戶的額外的工做能夠實現 secondary sort。

在這篇文章中,首先完成在  Part I 中提到的一些東西。做者將盡可能覆蓋到影響 Spark 程序性能的方方面面,大家將會了解到資源調優,或者如何配置 Spark 以壓榨出集羣每一分資源。而後咱們將講述調試併發度,這是job性能中最難也是最重要的參數。最後,你將瞭解到數據自己的表達形式,Spark 讀取在磁盤的上的形式(主要是Apache Avro和 Apache Parquet)以及當數據須要緩存或者移動的時候內存中的數據形式。

調試資源分配

Spark 的用戶郵件郵件列表中常常會出現 「我有一個500個節點的集羣,爲何可是個人應用一次只有兩個 task 在執行」,鑑於 Spark 控制資源使用的參數的數量,這些問題不該該出現。可是在本章中,你將學會壓榨出你集羣的每一分資源。推薦的配置將根據不一樣的集羣管理系統( YARN、Mesos、Spark Standalone)而有所不一樣,咱們將主要集中在YARN 上,由於這個 Cloudera 推薦的方式。

咱們先看一下在 YARN 上運行 Spark 的一些背景。查看以前的博文:點擊這裏查看

Spark(以及YARN) 須要關心的兩項主要的資源是 CPU 和 內存, 磁盤 和 IO 固然也影響着 Spark 的性能,可是無論是 Spark 仍是 Yarn 目前都無法對他們作實時有效的管理。

在一個 Spark 應用中,每一個 Spark executor 擁有固定個數的 core 以及固定大小的堆大小。core 的個數能夠在執行 spark-submit 或者 pyspark 或者 spark-shell 時,經過參數 --executor-cores 指定,或者在 spark-defaults.conf 配置文件或者 SparkConf 對象中設置 spark.executor.cores 參數。一樣地,堆的大小能夠經過 --executor-memory 參數或者 spark.executor.memory 配置項。core 配置項控制一個 executor 中task的併發數。 --executor-cores 5 意味着每一個executor 中最多同時能夠有5個 task 運行。memory 參數影響 Spark 能夠緩存的數據的大小,也就是在 groupaggregate 以及 join 操做時 shuffle 的數據結構的最大值。

--num-executors 命令行參數或者spark.executor.instances 配置項控制須要的 executor 個數。從 CDH 5.4/Spark 1.3 開始,你能夠避免使用這個參數,只要你經過設置 spark.dynamicAllocation.enabled 參數打開 動態分配 。動態分配可使的 Spark 的應用在有後續積壓的在等待的 task 時請求 executor,而且在空閒時釋放這些 executor

同時 Spark 需求的資源如何跟 YARN 中可用的資源配合也是須要着重考慮的,YARN 相關的參數有:

  • yarn.nodemanager.resource.memory-mb 控制在每一個節點上 container 可以使用的最大內存;
  • yarn.nodemanager.resource.cpu-vcores 控制在每一個節點上 container 可以使用的最大core個數;

請求5個 core 會生成向 YARN 要5個虛擬core的請求。從 YARN 請求內存相對比較複雜由於如下的一些緣由:

  • --executor-memory/spark.executor.memory 控制 executor 的堆的大小,可是 JVM 自己也會佔用必定的堆空間,好比內部的 String 或者直接 byte buffer,executor memory 的 spark.yarn.executor.memoryOverhead 屬性決定向 YARN 請求的每一個 executor 的內存大小,默認值爲max(384, 0.7 * spark.executor.memory);
  • YARN 可能會比請求的內存高一點,YARN 的 yarn.scheduler.minimum-allocation-mb 和 yarn.scheduler.increment-allocation-mb 屬性控制請求的最小值和增長量。

下面展現的是 Spark on YARN 內存結構:

若是這些還不夠決定Spark executor 個數,還有一些概念還須要考慮的:

  • 應用的master,是一個非 executor 的容器,它擁有特殊的從 YARN 請求資源的能力,它本身自己所佔的資源也須要被計算在內。在 yarn-client 模式下,它默認請求 1024MB 和 1個core。在 yarn-cluster 模式中,應用的 master 運行 driver,因此使用參數 --driver-memory 和 --driver-cores 配置它的資源經常頗有用。
  • 在 executor 執行的時候配置過大的 memory 常常會致使過長的GC延時,64G是推薦的一個 executor 內存大小的上限。
  • 咱們注意到 HDFS client 在大量併發線程是時性能問題。大概的估計是每一個 executor 中最多5個並行的 task 就能夠佔滿的寫入帶寬。
  • 在運行微型 executor 時(好比只有一個core並且只有夠執行一個task的內存)扔掉在一個JVM上同時運行多個task的好處。好比 broadcast 變量須要爲每一個 executor 複製一遍,這麼多小executor會致使更多的數據拷貝。

爲了讓以上的這些更加具體一點,這裏有一個實際使用過的配置的例子,能夠徹底用滿整個集羣的資源。假設一個集羣有6個節點有NodeManager在上面運行,每一個節點有16個core以及64GB的內存。那麼 NodeManager的容量:yarn.nodemanager.resource.memory-mb 和 yarn.nodemanager.resource.cpu-vcores 能夠設爲 63 * 1024 = 64512 (MB) 和 15。咱們避免使用 100% 的 YARN container 資源由於還要爲 OS 和 hadoop 的 Daemon 留一部分資源。在上面的場景中,咱們預留了1個core和1G的內存給這些進程。Cloudera Manager 會自動計算而且配置。

因此看起來咱們最早想到的配置會是這樣的:--num-executors 6 --executor-cores 15 --executor-memory 63G。可是這個配置可能沒法達到咱們的需求,由於: 
- 63GB+ 的 executor memory 塞不進只有 63GB 容量的 NodeManager; 
- 應用的 master 也須要佔用一個core,意味着在某個節點上,沒有15個core給 executor 使用; 
- 15個core會影響 HDFS IO的吞吐量。 
配置成 --num-executors 17 --executor-cores 5 --executor-memory 19G 可能會效果更好,由於: 
- 這個配置會在每一個節點上生成3個 executor,除了應用的master運行的機器,這臺機器上只會運行2個 executor 
- --executor-memory 被分紅3份(63G/每一個節點3個executor)=21。 21 * (1 - 0.07) ~ 19。

調試併發

咱們知道 Spark 是一套數據並行處理的引擎。可是 Spark 並非神奇得可以將全部計算並行化,它沒辦法從全部的並行化方案中找出最優的那個。每一個 Spark stage 中包含若干個 task,每一個 task 串行地處理數據。在調試 Spark 的job時,task的個數多是決定程序性能的最重要的參數。

那麼這個數字是由什麼決定的呢?在以前的博文中介紹了 Spark 如何將 RDD 轉換成一組 stagetask 的個數與 stage 中上一個 RDD 的 partition 個數相同。而一個 RDD 的 partition 個數與被它依賴的 RDD 的 partition 個數相同,除了如下的狀況: coalesce transformation 能夠建立一個具備更少 partition 個數的 RDD,union transformation 產出的 RDD的 partition 個數是它父 RDD 的 partition 個數之和, cartesian 返回的 RDD 的 partition 個數是它們的積。

若是一個 RDD 沒有父 RDD 呢? 由 textFile 或者 hadoopFile 生成的 RDD 的 partition 個數由它們底層使用的 MapReduce InputFormat 決定的。通常狀況下,每讀到的一個 HDFS block 會生成一個 partition。經過 parallelize 接口生成的 RDD 的 partition 個數由用戶指定,若是用戶沒有指定則由參數 spark.default.parallelism 決定。

要想知道 partition 的個數,能夠經過接口 rdd.partitions().size() 得到。

這裏最須要關心的問題在於 task 的個數過小。若是運行時 task 的個數比實際可用的 slot 還少,那麼程序解無法使用到全部的 CPU 資源。

過少的 task 個數可能會致使在一些彙集操做時, 每一個 task 的內存壓力會很大。任何 join,cogroup,*ByKey 操做都會在內存生成一個 hash-map或者 buffer 用於分組或者排序。join, cogroup ,groupByKey 會在 shuffle 時在 fetching 端使用這些數據結構, reduceByKey ,aggregateByKey 會在 shuffle 時在兩端都會使用這些數據結構。

當須要進行這個彙集操做的 record 不能徹底輕易塞進內存中時,一些問題會暴露出來。首先,在內存 hold 大量這些數據結構的 record 會增長 GC的壓力,可能會致使流程停頓下來。其次,若是數據不能徹底載入內存,Spark 會將這些數據寫到磁盤,這會引發磁盤 IO和排序。在 Cloudera 的用戶中,這多是致使 Spark Job 慢的首要緣由。

那麼如何增長你的 partition 的個數呢?若是你的問題 stage 是從 Hadoop 讀取數據,你能夠作如下的選項: 
- 使用 repartition 選項,會引起 shuffle; 
- 配置 InputFormat 用戶將文件分得更小; 
- 寫入 HDFS 文件時使用更小的block。

若是問題 stage 從其餘 stage 中得到輸入,引起 stage 邊界的操做會接受一個 numPartitions 的參數,好比

val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)

X 應該取什麼值?最直接的方法就是作實驗。不停的將 partition 的個數從上次實驗的 partition 個數乘以1.5,直到性能再也不提高爲止。

同時也有一些原則用於計算 X,可是也不是很是的有效是由於有些參數是很難計算的。這裏寫到不是由於它們很實用,而是能夠幫助理解。這裏主要的目標是啓動足夠的 task 可使得每一個 task 接受的數據可以都塞進它所分配到的內存中。

每一個 task 可用的內存經過這個公式計算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores 。 memoryFraction 和 safetyFractio 默認值分別 0.2 和 0.8.

在內存中全部 shuffle 數據的大小很難肯定。最可行的是找出一個 stage 運行的 Shuffle Spill(memory) 和 Shuffle Spill(Disk) 之間的比例。在用全部shuffle 寫乘以這個比例。可是若是這個 stage 是 reduce 時,可能會有點複雜:

在往上增長一點由於大多數狀況下 partition 的個數會比較多。

試試在,在有所疑慮的時候,使用更多的 task 數(也就是 partition 數)都會效果更好,這與 MapRecuce 中建議 task 數目選擇儘可能保守的建議相反。這個由於 MapReduce 在啓動 task 時相比須要更大的代價。

壓縮你的數據結構

Spark 的數據流由一組 record 構成。一個 record 有兩種表達形式:一種是反序列化的 Java 對象另一種是序列化的二進制形式。一般狀況下,Spark 對內存中的 record 使用反序列化以後的形式,對要存到磁盤上或者須要經過網絡傳輸的record 使用序列化以後的形式。也有計劃在內存中存儲序列化以後的 record

spark.serializer 控制這兩種形式之間的轉換的方式。Kryo serializer,org.apache.spark.serializer.KryoSerializer 是推薦的選擇。但不幸的是它不是默認的配置,由於 KryoSerializer 在早期的 Spark 版本中不穩定,而 Spark 不想打破版本的兼容性,因此沒有把 KryoSerializer 做爲默認配置,可是 KryoSerializer 應該在任何狀況下都是第一的選擇。

你的 record 在這兩種形式切換的頻率對於 Spark 應用的運行效率具備很大的影響。去檢查一下處處傳遞數據的類型,看看可否擠出一點水分是很是值得一試的。

過多的反序列化以後的 record 可能會致使數據處處到磁盤上更加頻繁,也使得可以 Cache 在內存中的 record 個數減小。點擊這裏查看如何壓縮這些數據。

過多的序列化以後的 record 致使更多的 磁盤和網絡 IO,一樣的也會使得可以 Cache 在內存中的 record 個數減小,這裏主要的解決方案是把全部的用戶自定義的 class 都經過 SparkConf#registerKryoClasses 的API定義和傳遞的。 

數據格式

任什麼時候候你均可以決定你的數據如何保持在磁盤上,使用可擴展的二進制格式好比:Avro,Parquet,Thrift或者Protobuf,從中選擇一種。當人們在談論在Hadoop上使用Avro,Thrift或者Protobuf時,都是認爲每一個 record 保持成一個 Avro/Thrift/Protobuf 結構保存成 sequence file。而不是JSON。

每次當時試圖使用JSON存儲大量數據時,仍是先放棄吧...

 

原文地址:

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

相關文章
相關標籤/搜索