Spark基本原理

Hadoop和Spark關係

Spark比Hadoop快的緣由:Hadoop在MapReduce後會將結果寫入磁盤,第二次MapReduce再取出,Spark去除了兩次運算間多餘的IO消耗,直接將數據緩存在內存中。html

Spark運行原理

提交做業->啓動Driver進程->申請資源,即Executor進程->做業代碼分拆爲stage執行->從上一次stage拉取所需key執行,直至任務完成->保存到Executor進程的內存或磁盤web

Spark基本運行原理

咱們使用spark-submit提交一個Spark做業以後,這個做業就會啓動一個對應的Driver進程。根據你使用的部署模式(deploy-mode)不一樣,Driver進程可能在本地啓動,也可能在集羣中某個工做節點上啓動。而Driver進程要作的第一件事情,就是向集羣管理器(能夠是Spark Standalone集羣,也能夠是其餘的資源管理集羣,美團•大衆點評使用的是YARN做爲資源管理集羣)申請運行Spark做業須要使用的資源,這裏的資源指的就是Executor進程。YARN集羣管理器會根據咱們爲Spark做業設置的資源參數,在各個工做節點上,啓動必定數量的Executor進程,每一個Executor進程都佔有必定數量的內存和CPU core。緩存

在申請到了做業執行所需的資源以後,Driver進程就會開始調度和執行咱們編寫的做業代碼了。Driver進程會將咱們編寫的Spark做業代碼分拆爲多個stage,每一個stage執行一部分代碼片斷,併爲每一個stage建立一批Task,而後將這些Task分配到各個Executor進程中執行。Task是最小的計算單元,負責執行如出一轍的計算邏輯(也就是咱們本身編寫的某個代碼片斷),只是每一個Task處理的數據不一樣而已。一個stage的全部Task都執行完畢以後,會在各個節點本地的磁盤文件中寫入計算中間結果,而後Driver就會調度運行下一個stage。下一個stage的Task的輸入數據就是上一個stage輸出的中間結果。如此循環往復,直到將咱們本身編寫的代碼邏輯所有執行完,而且計算完全部的數據,獲得咱們想要的結果爲止。網絡

Spark是根據shuffle類算子來進行stage的劃分。若是咱們的代碼中執行了某個shuffle類算子(好比reduceByKey、join等),那麼就會在該算子處,劃分出一個stage界限來。能夠大體理解爲,shuffle算子執行以前的代碼會被劃分爲一個stage,shuffle算子執行以及以後的代碼會被劃分爲下一個stage。所以一個stage剛開始執行的時候,它的每一個Task可能都會從上一個stage的Task所在的節點,去經過網絡傳輸拉取須要本身處理的全部key,而後對拉取到的全部相同的key使用咱們本身編寫的算子函數執行聚合操做(好比reduceByKey()算子接收的函數),這個過程就是shuffle。多線程

當咱們在代碼中執行了cache/persist等持久化操做時,根據咱們選擇的持久化級別的不一樣,每一個Task計算出來的數據也會保存到Executor進程的內存或者所在節點的磁盤文件中。併發

所以Executor的內存主要分爲三塊:第一塊是讓Task執行咱們本身編寫的代碼時使用,默認是佔Executor總內存的20%;第二塊是讓Task經過shuffle過程拉取了上一個stage的Task的輸出後,進行聚合等操做時使用,默認也是佔Executor總內存的20%;第三塊是讓RDD持久化時使用,默認佔Executor總內存的60%。分佈式

Task的執行速度是跟每一個Executor進程的CPU core數量有直接關係的。一個CPU core同一時間只能執行一個線程。而每一個Executor進程上分配到的多個Task,都是以每一個Task一條線程的方式,多線程併發運行的。若是CPU core數量比較充足,並且分配到的Task數量比較合理,那麼一般來講,能夠比較快速和高效地執行完這些Task線程。ide

RDD

  • RDD是Spark提供的核心抽象,全稱爲Resillient Distributed Dataset,即彈性分佈式數據集;
  • RDD最重要的特性就是,提供了容錯性,Spark能夠重算
  • RDD存放在內存中,可是內存不足時能夠寫入磁盤緩存(彈性)
  1. RDD分佈式是什麼意思?函數

    一個RDD,在邏輯上抽象地表明瞭一個HDFS文件;它其實是被分爲多個存放在spark不一樣節點上的分區。好比說,RDD有900萬數據。分爲9個partition,9個分區。oop

  2. RDD彈性是什麼意思,體如今哪一方面? 容錯性體如今哪方面?

    a.RDD自動進行內存和磁盤之間權衡和切換的機制,就是RDD的彈性的特色所在。b.當它發現本身的數據丟失,會自動從本身來源的數據進行重計算,這一切對用戶是徹底透明的。

shuffle和stage

shuffle是劃分DAG中stage的標識,同時影響 Spark 執行速度的關鍵步驟。以下 DAG 流程圖中,分別讀取數據,通過處理後 join 2個 RDD 獲得結果:

RDD 的Transformation函數中,又分爲窄依賴(narrow dependency)和寬依賴(wide dependency)的操做.窄依賴跟寬依賴的區別是是否發生shuffle(洗牌) 操做.寬依賴會發生shuffle操做. 窄依賴是子 RDD的各個分片(partition)不依賴於其餘分片,可以獨立計算獲得結果,寬依賴指 RDD 的各個分片會依賴於父RDD 的多個分片,因此會形成父 RDD 的各個分片在集羣中從新分片。例子:

// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
                Map(x => (x._1, x._2.toList.length))

第一個Map操做將 RDD 裏的各個元素進行映射, RDD 的各個數據元素之間不存在依賴,能夠在集羣的各個內存中獨立計算,也就是並行化,第二個 groupby以後的 Map 操做,爲了計算相同 key 下的元素個數,須要把相同 key 的元素彙集到同一個 partition 下,因此形成了數據在內存中的從新分佈,即 shuffle 操做.shuffle 操做是 spark中最耗時的操做,應儘可能避免沒必要要的 shuffle.

開發調優

  • 避免建立重複的RDD
  • 儘量複用同一個RDD
  • 對屢次使用的RDD進行持久化

資源參數調優

num-executors

  • 參數說明:該參數用於設置Spark做業總共要用多少個Executor進程來執行。Driver在向YARN集羣管理器申請資源時,YARN集羣管理器會盡量按照你的設置來在集羣的各個工做節點上,啓動相應數量的Executor進程。這個參數很是之重要,若是不設置的話,默認只會給你啓動少許的Executor進程,此時你的Spark做業的運行速度是很是慢的。
  • 參數調優建議:每一個Spark做業的運行通常設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都很差。設置的太少,沒法充分利用集羣資源;設置的太多的話,大部分隊列可能沒法給予充分的資源。

executor-memory

  • 參數說明:該參數用於設置每一個Executor進程的內存。Executor內存的大小,不少時候直接決定了Spark做業的性能,並且跟常見的JVM OOM異常,也有直接的關聯。
  • 參數調優建議:每一個Executor進程的內存設置4G~8G較爲合適。可是這只是一個參考值,具體的設置仍是得根據不一樣部門的資源隊列來定。能夠看看本身團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大內存量的。此外,若是你是跟團隊裏其餘人共享這個資源隊列,那麼申請的內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你本身的Spark做業佔用了隊列全部的資源,致使別的同窗的做業沒法運行。

executor-cores

  • 參數說明:該參數用於設置每一個Executor進程的CPU core數量。這個參數決定了每一個Executor進程並行執行task線程的能力。由於每一個CPU core同一時間只能執行一個task線程,所以每一個Executor進程的CPU core數量越多,越可以快速地執行完分配給本身的全部task線程。
  • 參數調優建議:Executor的CPU core數量設置爲2~4個較爲合適。一樣得根據不一樣部門的資源隊列來定,能夠看看本身的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每一個Executor進程能夠分配到幾個CPU core。一樣建議,若是是跟他人共享這個隊列,那麼num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其餘同窗的做業運行。

driver-memory

  • 參數說明:該參數用於設置Driver進程的內存。
  • 參數調優建議:Driver的內存一般來講不設置,或者設置1G左右應該就夠了。惟一須要注意的一點是,若是須要使用collect算子將RDD的數據所有拉取到Driver上進行處理,那麼必須確保Driver的內存足夠大,不然會出現OOM內存溢出的問題。

spark.default.parallelism

  • 參數說明:該參數用於設置每一個stage的默認task數量。這個參數極爲重要,若是不設置可能會直接影響你的Spark做業性能。
  • 參數調優建議:Spark做業的默認task數量爲500~1000個較爲合適。不少同窗常犯的一個錯誤就是不去設置這個參數,那麼此時就會致使Spark本身根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。一般來講,Spark默認設置的數量是偏少的(好比就幾十個task),若是task數量偏少的話,就會致使你前面設置好的Executor的參數都前功盡棄。試想一下,不管你的Executor進程有多少個,內存和CPU有多大,可是task只有1個或者10個,那麼90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!所以Spark官網建議的設置原則是,設置該參數爲num-executors * executor-cores的2~3倍較爲合適,好比Executor的總CPU core數量爲300個,那麼設置1000個task是能夠的,此時能夠充分地利用Spark集羣的資源。

spark.storage.memoryFraction

  • 參數說明:該參數用於設置RDD持久化數據在Executor內存中能佔的比例,默認是0.6。也就是說,默認Executor 60%的內存,能夠用來保存持久化的RDD數據。根據你選擇的不一樣的持久化策略,若是內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。
  • 參數調優建議:若是Spark做業中,有較多的RDD持久化操做,該參數的值能夠適當提升一些,保證持久化的數據可以容納在內存中。避免內存不夠緩存全部的數據,致使數據只能寫入磁盤中,下降了性能。可是若是Spark做業中的shuffle類操做比較多,而持久化操做比較少,那麼這個參數的值適當下降一些比較合適。此外,若是發現做業因爲頻繁的gc致使運行緩慢(經過spark web ui能夠觀察到做業的gc耗時),意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。

spark.shuffle.memoryFraction

  • 參數說明:該參數用於設置shuffle過程當中一個task拉取到上個stage的task的輸出後,進行聚合操做時可以使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操做。shuffle操做在進行聚合時,若是發現使用的內存超出了這個20%的限制,那麼多餘的數據就會溢寫到磁盤文件中去,此時就會極大地下降性能。
  • 參數調優建議:若是Spark做業中的RDD持久化操做較少,shuffle操做較多時,建議下降持久化操做的內存佔比,提升shuffle操做的內存佔比比例,避免shuffle過程當中數據過多時內存不夠用,必須溢寫到磁盤上,下降了性能。此外,若是發現做業因爲頻繁的gc致使運行緩慢,意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。
    ***

    http://blog.csdn.net/ap0810217/article/details/55195962
    https://tech.meituan.com/spark-tuning-basic.html
    http://blog.csdn.net/databatman/article/details/53023818

相關文章
相關標籤/搜索