spark 整理的一些知識

Spark 知識點java

  1. 請描述spark RDD原理與特徵?

 

RDD全稱是resilient distributed dataset(具備彈性的分佈式數據集)。一個RDD僅僅是一個分佈式的元素集合。在Spark中,全部工做都表示爲建立新的RDDs、轉換現有的RDDs,或者調用RDDs上的操做來計算結果。python

       Spark,一個RDD僅僅是一個不可變的分佈式對象集合.每一個RDD被切分紅多個能夠在不一樣集羣節點上進行計算的partitions.RDDs能夠包含Python,JavaScala任何類型的對象,包括用戶自定義的class文件。web

       用戶建立RDD的兩種方法:經過加載外部數據集或者經過在使用者運行的driver程序中分配一個對象集合。算法

一旦建立,RDD提供兩種操做類型:轉換(transformations)和執行(actions)Transformations會根據以前的RDD構造一個新的RDDsql

Transformationsactions的區別在於Spark計算RDD的方式.儘管你能夠隨時定義新的RDD,可是Spark只在他們第一次被action使用的時候進行計算。shell

最終,每當你在SparkRDD上運行一個action,默認會從新計算。若是你想在多個action上從新使用一個RDDRDD.persist()方法能夠進行保存。api

總的來說,每一個Spark項目或者shell對話都包含如下步驟:數組

  • 1.從外部數據建立一些輸入RDD
  • 2.經過使用transformations如filter()來轉換RDD成爲新的RDD
  • 3.使用persist()來持久化任何須要被重用的中間RDD
  • 4.啓動actions如count()和first()來啓動並行計算,而後Spark會對這些計算進行優化和執行

特徵:緩存

RDD上的transformation(轉換)是惰性求值,意味着Spark除非看到一個action(動做),不然不會開始執行。安全

  • 它是不變的數據結構存儲
  • 它是支持跨集羣的分佈式數據結構
  • 能夠根據數據記錄的key對結構進行分區
  • 提供了粗粒度的操做,且這些操做都支持分區
  • 它將數據存儲在內存中,從而提供了低延遲性

 

 

  1. 如何理解Spark RDD 寬依賴與窄依賴,並結合示例描述?

 

 

  1. 基於spark-1.6.1版本,請描述spark統一內存管理機制,和各部份內存區域的用途?

 

Spark統一內存管理主要是指Executor 端的內存模型,分爲堆內內存和堆外內存兩大區域。

(1),堆內內存

默認狀況下,executor僅僅使用堆內內存(參數spark.memory.offHeap.enabled設置爲true能夠開啓堆外內存),分爲4個部分:

Execution memory:主要用於存放 Shuffle、Join、Sort、Aggregation 等計算過程當中的臨時數據;

Storage memory:主要用於存儲 spark 的 cache 數據,例如RDD的緩存、unroll數據;

User Memory:主要用於存儲 RDD 轉換操做所須要的數據,例如 RDD 依賴等信息。

Reserved Memory:系統預留內存,會用來存儲Spark內部對象。

能夠歸納爲下圖:

Spark 內存管理

說明以下:

systemMemory: executor的內存總和,經過spark.executor.memory 或--executor-memory 配置

reservedMemory :默認爲300M,通常不可修改,測試場景下能夠經過spark.testing.reservedMemory進行指定。

usableMemory = systemMemory - reservedMemory,這個就是 Spark任務的可用內存。

經過參數spark.memory.fraction(默認0.6)和spark.memory.storageFraction(默認0.5)參數能夠調整User Memory,Execution memory,Storage memory的大小。

 

Execution memory Storage memory的內存具備共享特性,概述以下:

程序提交時咱們都會設定基本的 Execution 內存和 Storage 內存區域;

在程序運行時,若是雙方的空間都不足時,則存儲到硬盤;

Execution memory的空間充足,而storage memory不足時,storage能夠佔用execution memory。若是後續execution須要用到被佔用的內存時,能夠要求storage將相關數據轉存到硬盤來歸還內存

Storage memory的空間充足,而Execution memory不足時,Execution能夠佔用storage內存,可是在storage須要內存而execution仍然在佔用時,則沒法要求歸還,只能等待execution主動釋放。

 

(2),堆外內存

這種模式不在 JVM 內申請內存,而是調用 Java unsafe 相關 API 直接向操做系統申請內存,因爲這種方式不進過 JVM 內存管理,因此能夠避免頻繁的 GC,這種內存申請的缺點是必須本身編寫內存申請和釋放的邏輯。

默認狀況下堆外內存是關閉的,能夠經過spark.memory.offHeap.enabled參數開啓,經過spark.memory.offHeap.size參數設置堆外內存大小。

堆外內存只分爲:Execution  memory Storage memory,以下圖

Spark 內存管理

若是堆外內存被啓用,那麼 Executor 內將同時存在堆內和堆外內存,二者的使用互補影響,這個時候 Executor 中的 Execution 內存是堆內的 Execution 內存和堆外的 Execution 內存之和,同理,Storage 內存也同樣。

 

 

  1. Spark 中的rdd、dataframe和dataset有什麼區別?

 

RDDDataFrameDataset全都是spark平臺下的分佈式彈性數據集,爲處理超大型數據提供便利;都有惰性機制,都會根據spark的內存狀況自動緩存運算,三者都有partition的概念

區別以下:

 

RDD:

編譯時類型安全;支持非結構化數據;直接經過類名點的方式來操做數據;默認採用的是java序列化方式方式,序列化結果較大效率較低;數據存儲在java堆內存中,有可能致使頻繁gc;

 

DataFrame

編譯時不能進行類型轉化安全檢查DataFrame每一行的類型固定爲Row,只有經過解析才能獲取各個字段的值,每一列的值無法用類點名直接訪問;支持結構化數據;採用Kryo進行序列化,並且序列化時不須要帶上元信息,大大的減小了序列化大小;數據能夠保存在堆外內存中,減小了gc次數;支持Rpython

Dataset

支持結構化和非結構化數據;編譯時類型安全;能夠採用堆外內存進行存儲;Dataset中,每一行是什麼類型是不必定的

 

  1. 請詳細闡述spark運行時job、stage、task的劃分原則,以及其相互之間的關係?

Spark經過action切分出job,每一個job裏面經過shuffle切分stage,而每個stage裏面有若干task,task與RDD的partition對應,一個RDD有若干個partition,stage對輸入RDD的處理時,每一個partition就對應了一個task。

 

  1. 請描述spark hash shuffle 與 sort shuffle 之間的區別與優劣點

在Spark的源碼中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。而隨着Spark的版本的發展,ShuffleManager也在不斷迭代,變得愈來愈先進。

在Spark 1.2之前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有着一個很是嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操做影響了性能。

所以在Spark 1.2之後的版本中,默認的ShuffleManager改爲了SortShuffleManager。SortShuffleManager相較於HashShuffleManager來講,有了必定的改進。主要就在於,每一個Task在進行shuffle操做時,雖然也會產生較多的臨時磁盤文件,可是最後會將全部的臨時文件合併(merge)成一個磁盤文件,所以每一個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取本身的數據時,只要根據索引讀取每一個磁盤文件中的部分數據便可。

下面咱們詳細分析一下HashShuffleManager和SortShuffleManager的原理。

(1)HashShuffleManager運行原理

未經優化的HashShuffleManager

下圖說明了未經優化的HashShuffleManager的原理。這裏咱們先明確一個假設前提:每一個Executor只有1個CPU core,也就是說,不管這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。

咱們先從shuffle write開始提及。shuffle write階段,主要就是在一個stage結束計算以後,爲了下一個stage能夠執行shuffle類的算子(好比reduceByKey),而將每一個task處理的數據按key進行「分類」。所謂「分類」,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤以前,會先將數據寫入內存緩衝中,當內存緩衝填滿以後,纔會溢寫到磁盤文件中去。

那麼每一個執行shuffle write的task,要爲下一個stage建立多少個磁盤文件呢?很簡單,下一個stage的task有多少個,當前stage的每一個task就要建立多少份磁盤文件。好比下一個stage總共有100個task,那麼當前stage的每一個task都要建立100份磁盤文件。若是當前stage有50個task,總共有10個Executor,每一個Executor執行5個Task,那麼每一個Executor上總共就要建立500個磁盤文件,全部Executor上會建立5000個磁盤文件。因而可知,未經優化的shuffle write操做所產生的磁盤文件的數量是極其驚人的。

接着咱們來講說shuffle read。shuffle read,一般就是一個stage剛開始時要作的事情。此時該stage的每個task就須要將上一個stage的計算結果中的全部相同key,從各個節點上經過網絡都拉取到本身所在的節點上,而後進行key的聚合或鏈接等操做。因爲shuffle write的過程當中,task給下游stage的每一個task都建立了一個磁盤文件,所以shuffle read的過程當中,每一個task只要從上游stage的全部task所在節點上,拉取屬於本身的那一個磁盤文件便可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每一個shuffle read task都會有一個本身的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,而後經過內存中的一個Map進行聚合等操做。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操做。以此類推,直到最後將全部數據到拉取完,並獲得最終的結果。

http://tech.meituan.com/img/spark-tuning/hash-shuffle-common.png

 

優化後的HashShuffleManager

下圖說明了優化後的HashShuffleManager的原理。這裏說的優化,是指咱們能夠設置一個參數,spark.shuffle.consolidateFiles。該參數默認值爲false,將其設置爲true便可開啓優化機制。一般來講,若是咱們使用HashShuffleManager,那麼都建議開啓這個選項。

開啓consolidate機制以後,在shuffle write過程當中,task就不是爲下游stage的每一個task建立一個磁盤文件了。此時會出現shuffleFileGroup的概念,每一個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就能夠並行執行多少個task。而第一批並行執行的每一個task都會建立一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。

當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會複用以前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。所以,consolidate機制容許不一樣的task複用同一批磁盤文件,這樣就能夠有效將多個task的磁盤文件進行必定程度上的合併,從而大幅度減小磁盤文件的數量,進而提高shuffle write的性能。

假設第二個stage有100個task,第一個stage有50個task,總共仍是有10個Executor,每一個Executor執行5個task。那麼本來使用未經優化的HashShuffleManager時,每一個Executor會產生500個磁盤文件,全部Executor會產生5000個磁盤文件的。可是此時通過優化以後,每一個Executor建立的磁盤文件的數量的計算公式爲:CPU core的數量 * 下一個stage的task數量。也就是說,每一個Executor此時只會建立100個磁盤文件,全部Executor只會建立1000個磁盤文件。

(2)SortShuffleManager運行原理

SortShuffleManager的運行機制主要分紅兩種,一種是普通運行機制,另外一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認爲200),就會啓用bypass機制。

普通運行機制

下圖說明了普通的SortShuffleManager的原理。在該模式下,數據會先寫入一個內存數據結構中,此時根據不一樣的shuffle算子,可能選用不一樣的數據結構。若是是reduceByKey這種聚合類的shuffle算子,那麼會選用Map數據結構,一邊經過Map進行聚合,一邊寫入內存;若是是join這種普通的shuffle算子,那麼會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構以後,就會判斷一下,是否達到了某個臨界閾值。若是達到臨界閾值的話,那麼就會嘗試將內存數據結構中的數據溢寫到磁盤,而後清空內存數據結構。

在溢寫到磁盤文件以前,會先根據key對內存數據結構中已有的數據進行排序。排序事後,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是經過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩衝輸出流,首先會將數據緩衝在內存中,當內存緩衝滿溢以後再一次寫入磁盤文件中,這樣能夠減小磁盤IO次數,提高性能。

一個task將全部數據寫入內存數據結構的過程當中,會發生屢次磁盤溢寫操做,也就會產生多個臨時文件。最後會將以前全部的臨時磁盤文件都進行合併,這就是merge過程,此時會將以前全部臨時磁盤文件中的數據讀取出來,而後依次寫入最終的磁盤文件之中。此外,因爲一個task就只對應一個磁盤文件,也就意味着該task爲下游stage的task準備的數據都在這一個文件中,所以還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。

SortShuffleManager因爲有一個磁盤文件merge的過程,所以大大減小了文件數量。好比第一個stage有50個task,總共有10個Executor,每一個Executor執行5個task,而第二個stage有100個task。因爲每一個task最終只有一個磁盤文件,所以此時每一個Executor上只有5個磁盤文件,全部Executor只有50個磁盤文件。

http://tech.meituan.com/img/spark-tuning/sort-shuffle-common.png

 

bypass運行機制

下圖說明了bypass SortShuffleManager的原理。bypass運行機制的觸發條件以下:

•       shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。

•       不是聚合類的shuffle算子(好比reduceByKey)。

此時task會爲每一個下游task都建立一個臨時磁盤文件,並將數據按key進行hash而後根據key的hash值,將key寫入對應的磁盤文件之中。固然,寫入磁盤文件時也是先寫入內存緩衝,緩衝寫滿以後再溢寫到磁盤文件的。最後,一樣會將全部臨時磁盤文件都合併成一個磁盤文件,並建立一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是如出一轍的,由於都要建立數量驚人的磁盤文件,只是在最後會作一個磁盤文件的合併而已。所以少許的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來講,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運行機制的不一樣在於:第一,磁盤寫機制不一樣;第二,不會進行排序。也就是說,啓用該機制的最大好處在於,shuffle write過程當中,不須要進行數據的排序操做,也就節省掉了這部分的性能開銷。

http://tech.meituan.com/img/spark-tuning/sort-shuffle-bypass.png

 

  1. spark 計算的容錯性如何保證?

 

spark的容錯性依賴於Lineage機制與Checkpoint機制。RDDLineage記錄的是粗顆粒度的特定數據Transformation操做(如filtermapjoin等)行爲。當這個RDD的部分分區數據丟失時,它能夠經過Lineage獲取足夠的信息來從新運算和恢復丟失的數據分區;RDDLineage依賴方面分爲兩種:窄依賴與寬依賴,用來解決數據容錯的高效性。

第一,窄依賴能夠在某個計算節點上直接經過計算父RDD的某塊數據計算獲得子RDD對應的某塊數據;寬依賴則要等到父RDD全部數據都計算完成以後,而且父RDD的計算結果進行hash並傳到對應節點上以後才能計算子RDD

第二,數據丟失時,對於窄依賴只須要從新計算丟失的那一塊數據來恢復;對於寬依賴則要將祖先RDD中的全部數據塊所有從新計算來恢復。因此在長血統鏈特別是有寬依賴的時候,須要在適當的時機設置數據檢查點。也是這兩個特性要求對於不一樣依賴關係要採起不一樣的任務調度機制和容錯恢復機制

RDD計算中,經過檢查點機制進行容錯,傳統作檢查點有兩種方式:經過冗餘數據和日誌記錄更新操做。在RDD中的doCheckPoint方法至關於經過冗餘數據來緩存數據,而以前介紹的血統就是經過至關粗粒度的記錄更新操做來實現容錯的。

檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。

 

 

  1. 請描述spark 廣播變量原理與特色?

 

Spark廣播變量能夠解決閉包函數引用外部大變量引發的性能問題。Spark使用高效的廣播算法分發廣播變量,以下降通訊成本。廣播變量會以只讀形式緩存在每一個機器的本地,可使用它們以高效的方式爲每一個節點提供大型輸入數據集的副本。

Broadcast的實現有兩種:HTTPBroadcastTorrentBroadcast;HTTPBroadcast是經過http來傳輸,該方式可能會形成Driver所在的節點網絡堵塞。TorrentBroadcast。相似於常見的BitTorrent技術,基本思想是將data切分爲一組blocks存儲於DriverBlockManager中。假設如今有一些Executors獲取到了一些blocks,那麼這些Executors就能夠成爲data server。隨着fetchExecutor愈來愈多,也就意味着更多的data server加入,那麼data就很快可以傳輸到全部Executor

TorrentBroadcast具體實現的原理是:

Driver端:

1. Driver先把data序列化爲byte array,而後再切割成blockSize大小的data blocksArray[ByteBuffer])。

2. 完成切割後,將每一個分塊數據存儲到driver本身的BlockManager中,StorageLevelMEMORY_AND_DISK_SER,當存儲完畢後會向BlockManagerMaster進行彙報。

 

Executor端:

1. 首先從本地查詢是否緩存了完整的data,若是查詢到了,則當即返回;不然調用readBlocksDriver或者其餘Executor拉取 Torrent 塊。

2. 新建一組Array[BlockData](numBlocks)用於存儲從遠程拉取過來的Block,接着按照隨機的索引順序(假設有5個塊,那麼打亂以後,拉取的索引順序可能爲3-1-4-2-5)一個個的去fetch block data

3. 每一個block data都有一個惟一ide.g. broadcast_xx_piece_0),首先會根據這個idBlockManager查詢本地是否有數據,若是有則將數據放到數組中索引對應的位置;不然,根據idBlockManager從遠程拉取數據。

4. BlockManager首先會向Driver查詢該id對應的塊在哪些locations上存在,當前Executor就會隨機選擇一個location進行下載,下載完成後,會報告給BlockManagerMaster。隨着下載的Executor越多,那麼這個location也就越多,data block服務器也就越多。

5. 當獲取到全部的BlockData以後(全部的BlockData都將存儲在Array[BlockData]中),接着會對該array中的塊數據進行反序列化獲得原始的data。最後,會將data放到當前ExecutorBlockManager中,那麼之後再次獲取data時,就能夠直接從本地獲得。

 

廣播變量的一個最重要的特色就是,在同一個執行器上的全部任務均可以共享此Broadcast,而不是每一個任務使用一個變量副本;還有在使用廣播變量作join操做時,不會產生shuffle

 

 

  1. Spark中的任務調度的調度模式有幾種,分別描述其實現原理?

 

spark的調度模式分爲兩種:FIFO(先進先出)和FAIR(公平調度)。默認是FIFO,即誰先提交誰先執行,而FAIR支持在調度池中再進行分組,能夠有不一樣的權重,根據權重、資源等來決定誰先執行, 例如能夠爲重要的jobs建立高優先級的池,或者把不一樣用戶的job放到不一樣的組,而後給用戶配置相同的資源量從而不至於某些用戶的做業少而獲得更少的資源。spark的調度模式能夠經過spark.scheduler.mode進行設置。

在DAGScheluer對job劃分好stage並以TaskSet的形式提交給TaskScheduler後,TaskScheduler的實現類會爲每一個TaskSet建立一個TaskSetMagager對象,並將該對象添加到調度池中。schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在建立TaskSchedulerImpl的時候經過scheduler.initialize(backend)的initialize方法對schedulableBuilder進行了實例化。程序會根據配置來建立不一樣的調度池,schedulableBuilder有兩種實現,分別是FIFOSchedulableBuilder和FairSchedulableBuilder。

 

FIFOSchedulableBuilder啥也沒幹。FairSchedulableBuilder的buildPools方法中會先去讀取FAIR模式的配置文件默認位於SPARK_HOME/conf/fairscheduler.xml,也能夠經過參數spark.scheduler.allocation.file設置用戶自定義配置文件。FAIR能夠配置多個調度池,即rootPool裏面仍是一組Pool,Pool中包含了TaskSetMagager。 FairSchedulableBuilder會根據配置文件建立buildFairSchedulerPool。根據每一個字段值(未設置則爲默認值)來實例化一個Pool對象,並添加到rootPool中。

 

一個spark應用程序包含一個TaskScheduler,一個TaskScheduler包含一個惟一的RootPool,FIFO只有一層Pool,包含TaskSetMagager,而FARI包含兩層Pool,RootPool包含子Pool,子Pool包含TaskSetMagager,RootPool都是在實例化SchedulableBuilder的時候建立的。兩種調度模式的最終實現都是同樣,不過FAIR會在添加以前會獲取須要使用的調度池,默認爲名字爲default的調度池。對於FIFO而言,parentPool都是RootPool,而FAIR,TaskSetMagager的parentPool都是RootPool的子Pool。

 

FIFO模式的算法類是FIFOSchedulingAlgorithm:

一、先比較priority,在FIFO中該優先級其實是Job ID,越早提交的job的jobId越小,priority越小,優先級越高。

二、若priority相同,則說明是同一個job裏的TaskSetMagager,則比較StageId,StageId越小優先級越高。

 

FAIR模式的算法實現類是FairSchedulingAlgorithm:

一、調度池運行的task數小於minShare的優先級比不小於的優先級要高。

二、若二者運行的task個數都比minShare小,則比較minShare使用率,使用率約低優先級越高。

三、若二者的minShare使用率相同,則比較權重使用率,使用率約低優先級越高。

四、若權重也相同,則比較名字。

 

 

  1. 請描述spark 算子reduce與reduceByKey的區別?

 

reduce(binary_function)

reduce將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最後只有一個值爲止。

 

val c = sc.parallelize(1 to 4)

c.reduce((x, y) => x + y)

//結果10

 

具體過程,RDD有1 2 3 4個元素,

1+2=3

3+3=6

6+4=10

 

reduceByKey(binary_function)

reduceByKey就是對元素爲KV對的RDD中Key相同的元素的Value進行binary_function的reduce操做,所以,Key相同的多個元素的值被reduce爲一個值,而後與原RDD中的Key組成一個新的KV對。

 

val a = sc.parallelize(List((1,2),(1,3),(3,4),(3,6)))

a.reduceByKey((x,y) => x + y).collect    

//結果 Array((1,5), (3,10))

具體過程,

(1,2),(1,3)被分到一組進行求和獲得(1,5)

(3,4),(3,6) 被分到一組進行求和(3,10)

 

 

  1. 請闡述spark yarn-client與yarn-cluster模式的原理,並分析其特色?

 

答: Yarn中,每一個application都有一個Application Master進程,負責從ResourceManager中申請資源。

 

Yarn-cluster模式下,driver運行在被YARN管理的Appliaction Master進程中。Client將其Application交給RM後能夠當即關閉而沒必要持續到Application的生命週期。

http://www.aboutyun.com/data/attachment/forum/201503/31/152454ecw4pe8vweo8aw65.png

 

Yarn-client中,driver運行在client進程中,Application Master僅僅從Yarn中申請資源給Executor,以後client會跟container通訊進行做業的調度。因此client得一直運行直到Application結束。

http://www.aboutyun.com/data/attachment/forum/201503/31/152418d6mesxsa9k0awcsc.png

In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.

 

通常來講,若是提交任務的節點和Worker集羣在同一個網絡內,此時client mode比較合適。若是提交任務的節點和Worker集羣相隔比較遠,就會採用cluster mode來最小化DriverExecutor之間的網絡延遲。

 

 

  1. Spark on yarn模式下,Spark executor oomjava.lang.OutOfMemoryError)時,請列舉有哪些能夠解決/調優的方式,請具體描述?

Diagnostic Messages for this Task:

Container [pid=28020,containerID=container_1459428698943_31555_01_004570] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.0 GB of 16.8 GB virtual memory used. Killing container

 

答:

1) map過程產生大量對象致使內存溢出:

這種溢出的緣由是在單個map中產生了大量的對象致使的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操做在rdd中,每一個對象都產生了10000個對象,這確定很容易產生內存溢出的問題。針對這種問題,在不增長內存的狀況下,能夠經過減小每一個Task的大小,以便達到每一個Task即便產生大量的對象Executor的內存也可以裝得下。具體作法能夠在會產生大量對象的map操做以前調用repartition方法,分區成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。

 

2)shuffle後內存溢出:

    shuffle內存溢出的狀況能夠說都是shuffle後,單個文件過大致使的。在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,須要傳入一個partitioner,大部分Spark中的shuffle操做,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數,這個參數經過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism參數只對HashPartitioner有效,因此若是是別的Partitioner或者本身實現的Partitioner就不能使用spark.default.parallelism這個參數來控制shuffle的併發量了。若是是別的partitioner致使的shuffle內存溢出,就須要從partitioner的代碼增長partitions的數量。

 

3) standalone模式下資源分配不均勻致使內存溢出:

在standalone的模式下若是配置了--total-executor-cores 和 --executor-memory 這兩個參數,可是沒有配置--executor-cores這個參數的話,就有可能致使,每一個Executor的memory是同樣的,可是cores的數量不一樣,那麼在cores數量多的Executor中,因爲可以同時執行多個Task,就容易致使內存溢出的狀況。這種狀況的解決方法就是同時配置--executor-cores或者spark.executor.cores參數,確保Executor資源分配均勻。

 

4) 在內存不足的使用,使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache():

    rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在內存不足的時候rdd.cache()的數據會丟失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在內存不足的時候會存儲在磁盤,避免重算,只是消耗點IO時間。

 

 

  1. 請分析如下異常發生緣由,並概述spark 監聽器機制?

Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler

 

  1. 舉例分析spark數據傾斜現象、緣由及解決方法?

 

  1. Spark streaming中Dstream是如何產生,請描述其過程?

 

根據時間實例產生RDDs,和batchDuration對齊的,如:timer實例就是1秒,1秒生成一個RDD,

每一個RDD對應一個Job,由於RDD就是DStream操做的時間間隔的最後一個RDD,後面的RDD對前面

的RDD有依賴關係,後面對前面有依賴能夠推到出整個依賴鏈條。

 

  1. Spark streaming與kafka集成應用時,從kafka讀取數據有幾種獲取方式,且各有什麼不一樣?

 

1、有兩種方式:

①Receiver-base

②Direct

Receiver-base這種方式是先把數據從kafka中讀取出來,而後緩存在內存,再定時處理。Receiver-based的Kafka讀取方式是基於Kafka高階(high-level) api來實現對Kafka數據的消費。在提交Spark Streaming任務後,Spark集羣會劃出指定的Receivers來專門、持續不斷、異步讀取Kafka的數據,讀取時間間隔以及每次讀取offsets範圍能夠由參數來配置。讀取的數據保存在Receiver中,具體StorageLevel方式由用戶指定,諸如MEMORY_ONLY等。當driver 觸發batch任務的時候,Receivers中的數據會轉移到剩餘的Executors中去執行

Direct方式採用Kafka簡單的consumer api方式來讀取數據,無需經由ZooKeeper,此種方式再也不須要專門Receiver來持續不斷讀取數據。當batch任務觸發時,由Executor讀取數據,並參與到其餘Executor的數據計算過程當中去。driver來決定讀取多少offsets,並將offsets交由checkpoints來維護。將觸發下次batch任務,再由Executor讀取Kafka數據並計算。今後過程咱們能夠發現Direct方式無需Receiver讀取數據,而是須要計算時再讀取數據,因此Direct方式的數據消費對內存的要求不高,只須要考慮批量計算所須要的內存便可;另外batch任務堆積時,也不會影響數據堆積

 

2、兩種方式的優缺點以下:

Direct方式具備如下方面的優點:

一、簡化並行(Simplified Parallelism)。不現須要建立以及union多輸入源,Kafka topic的partition與RDD的partition一一對應

二、高效(Efficiency)。Receiver-based保證數據零丟失(zero-data loss)須要配置spark.streaming.receiver.writeAheadLog.enable,此種方式須要保存兩份數據,浪費存儲空間也影響效率。而Direct方式則不存在這個問題。

三、強一致語義(Exactly-once semantics)。High-level數據由Spark Streaming消費,可是Offsets則是由Zookeeper保存。經過參數配置,能夠實現at-least once消費,此種狀況有重複消費數據的可能。

四、下降資源。Direct不須要Receivers,其申請的Executors所有參與到計算任務中;而Receiver-based則須要專門的Receivers來讀取Kafka數據且不參與計算。所以相同的資源申請,Direct 可以支持更大的業務。

五、下降內存。Receiver-based的Receiver與其餘Exectuor是異步的,並持續不斷接收數據,對於小業務量的場景還好,若是遇到大業務量時,須要提升Receiver的內存,可是參與計算的Executor並沒有需那麼多的內存。而Direct 由於沒有Receiver,而是在計算時讀取數據,而後直接計算,因此對內存的要求很低。實際應用中咱們能夠把原先的10G降至如今的2-4G左右。

六、魯棒性更好。Receiver-based方法須要Receivers來異步持續不斷的讀取數據,所以遇到網絡、存儲負載等因素,致使實時任務出現堆積,但Receivers卻還在持續讀取數據,此種狀況很容易致使計算崩潰。Direct 則沒有這種顧慮,其Driver在觸發batch 計算任務時,纔會讀取數據並計算。隊列出現堆積並不會引發程序的失敗。

Direct方式的缺點:

提升成本。Direct須要用戶採用checkpoint或者第三方存儲來維護offsets,而不像Receiver-based那樣,經過ZooKeeper來維護Offsets,此提升了用戶的開發成本。

監控可視化。Receiver-based方式指定topic指定consumer的消費狀況均能經過ZooKeeper來監控,而Direct則沒有這種便利,若是作到監控並可視化,則須要投入人力開發。

 

Receive-base優勢:

一、Kafka的high-level數據讀取方式讓用戶能夠專一於所讀數據,而不用關注或維護consumer的offsets,這減小用戶的工做量以及代碼量並且相對比較簡單。

Receive-base的缺點:

一、防數據丟失。作checkpoint操做以及配置spark.streaming.receiver.writeAheadLog.enable參數,配置spark.streaming.receiver.writeAheadLog.enable參數,每次處理以前須要將該batch內的日誌備份到checkpoint目錄中,這下降了數據處理效率,反過來又加劇了Receiver端的壓力;另外因爲數據備份機制,會受到負載影響,負載一高就會出現延遲的風險,致使應用崩潰。

二、單Receiver內存。因爲receiver也是屬於Executor的一部分,那麼爲了提升吞吐量,提升Receiver的內存。可是在每次batch計算中,參與計算的batch並不會使用到這麼多的內存,致使資源嚴重浪費。

三、在程序失敗恢復時,有可能出現數據部分落地,可是程序失敗,未更新offsets的狀況,這致使數據重複消費。

四、提升並行度,採用多個Receiver來保存Kafka的數據。Receiver讀取數據是異步的,並不參與計算。若是開較高的並行度來平衡吞吐量很不划算。五、Receiver和計算的Executor的異步的,那麼遇到網絡等因素緣由,致使計算出現延遲,計算隊列一直在增長,而Receiver則在一直接收數據,這很是容易致使程序崩潰。

六、採用MEMORY_AND_DISK_SER下降對內存的要求。可是在必定程度上影響計算的速度

 

  1. 請描述Spark thriftServer增量(配置參數spark.sql.thriftServer.incrementalCollect)取數機制

 

SparkthirfServer是一個基於HiveServer2開發的用於多用戶訪問Spark的服務器,提供了類JDBC接口使得任何使用BI工具的用戶可以方便接入Spark.

 

配置參數spark.sql.thriftServer.incrementalCollect的取值爲boolean類型,默認取值爲false

 

當值爲false的時候,採用collect取數機制,這種機制會一次性從spark獲取全部的數據返回給JDBC,這就意味着一個JDBC請求返回的大量數據集所有放在Spark sql thriftServer的堆內存裏面,這樣可能會致使內存溢出,可經過增長堆內存和手動分頁來規避風險。

 

當取值爲true時,採用叫作tolocalIterator的取數機制,具體爲一次只返回一個Spark分區的數值,這樣會對性能表現有所影響,可是會減小thriftSerer堆內存消耗。

 

 

  1. 以「select count(*) from table」的sql語言爲例,描述spark task產生過程?

 

總共有7個步驟:

  1. SQL語句通過SqlParser解析成Unresolved LogicalPlan;

SalParser匹配select 語句,獲取Distinct語句,投影字段projection,表relation,依次將匹配的字符串層層封裝,最終造成一顆LogicPlan的Tree

  1. 使用analyzer結合數據字典(catalog)進行綁定,生成ResolvedLogicalPlan
  2. 使用optimizer對ResolvedLogicalPlan進行優化,生成Optimized LogicalPlan
  3. 使用SparkPlan將LogicalPlan轉化成PhysicalPlan
  4. 使用prepareForExecution將PhysicalPlan轉換成可執行物理計劃
  5. 使用execute() 執行可執行物理計劃,生成SchemaRDD
  6. 建立task

 

 

  1. 請說明Spark-sql產生小文件的緣由,請描述其調優/解決方法?

 

答:產生小文件緣由:Spark默認在執行聚合(即shuffle)時,會多線程並行往hdfs寫數據(由於每一個DataFrame/RDD分紅若干個Partition,這些partition能夠被並行處理,默認有200個分區,由conf變量「spark.sql.shuffle.partitions」定義)。其結果就是一個存下來的文件,實際上是hdfs中一個目錄,在這個目錄下才是衆多partition對應的文件,最壞的狀況是出現好多size爲0的文件。

調優/解決辦法:調節參數spark.sql.shuffle.partitions,適當下降它的並行度;使用reparation(num)=coalesce(num, true)函數重分區coalesce和repartition,合併小文件

 

 

  1. 請列舉spark 經常使用的優化參數,並闡述其意義和應用場景?

 

①num-executors

 參數說明:該參數用於設置Spark做業總共要用多少個Executor進程來執行。Driver在向YARN集羣管理器申請資源時,YARN集羣管理器會盡量按照你的設置來在集羣的各個工做節點上,啓動相應數量的Executor進程。這個參數很是之重要,若是不設置的話,默認只會給你啓動少許的Executor進程,此時你的Spark做業的運行速度是很是慢的。

參數調優建議:每一個Spark做業的運行通常設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都很差。設置的太少,沒法充分利用集羣資源;設置的太多的話,大部分隊列可能沒法給予充分的資源。

 

②executor-memory

 參數說明:該參數用於設置每一個Executor進程的內存。Executor內存的大小,不少時候直接決定了Spark做業的性能,並且跟常見的JVM OOM異常,也有直接的關聯。

 參數調優建議:每一個Executor進程的內存設置4G~8G較爲合適。可是這只是一個參考值,具體的設置仍是得根據不一樣部門的資源隊列來定。能夠看看本身團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,就表明了你的Spark做業申請到的總內存量(也就是全部Executor進程的內存總和),這個量是不能超過隊列的最大內存量的。此外,若是你是跟團隊裏其餘人共享這個資源隊列,那麼申請的總內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你本身的Spark做業佔用了隊列全部的資源,致使別的同窗的做業沒法運行。

 

③executor-cores

 參數說明:該參數用於設置每一個Executor進程的CPU core數量。這個參數決定了每一個Executor進程並行執行task線程的能力。由於每一個CPU core同一時間只能執行一個task線程,所以每一個Executor進程的CPU core數量越多,越可以快速地執行完分配給本身的全部task線程。

參數調優建議:Executor的CPU core數量設置爲2~4個較爲合適。一樣得根據不一樣部門的資源隊列來定,能夠看看本身的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每一個Executor進程能夠分配到幾個CPU core。一樣建議,若是是跟他人共享這個隊列,那麼num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其餘同窗的做業運行。

 

④driver-memory

 參數說明:該參數用於設置Driver進程的內存。

 參數調優建議:Driver的內存一般來講不設置,或者設置1G左右應該就夠了。惟一須要注意的一點是,若是須要使用collect算子將RDD的數據所有拉取到Driver上進行處理,那麼必須確保Driver的內存足夠大,不然會出現OOM內存溢出的問題。

 

⑤spark.default.parallelism

 參數說明:該參數用於設置每一個stage的默認task數量。這個參數極爲重要,若是不設置可能會直接影響你的Spark做業性能。

 參數調優建議:Spark做業的默認task數量爲500~1000個較爲合適。不少同窗常犯的一個錯誤就是不去設置這個參數,那麼此時就會致使Spark本身根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。一般來講,Spark默認設置的數量是偏少的(好比就幾十個task),若是task數量偏少的話,就會致使你前面設置好的Executor的參數都前功盡棄。試想一下,不管你的Executor進程有多少個,內存和CPU有多大,可是task只有1個或者10個,那麼90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!所以Spark官網建議的設置原則是,設置該參數爲num-executors * executor-cores的2~3倍較爲合適,好比Executor的總CPU core數量爲300個,那麼設置1000個task是能夠的,此時能夠充分地利用Spark集羣的資源。

 

⑥spark.storage.memoryFraction

 參數說明:該參數用於設置RDD持久化數據在Executor內存中能佔的比例,默認是0.6。也就是說,默認Executor 60%的內存,能夠用來保存持久化的RDD數據。根據你選擇的不一樣的持久化策略,若是內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。

參數調優建議:若是Spark做業中,有較多的RDD持久化操做,該參數的值能夠適當提升一些,保證持久化的數據可以容納在內存中。避免內存不夠緩存全部的數據,致使數據只能寫入磁盤中,下降了性能。可是若是Spark做業中的shuffle類操做比較多,而持久化操做比較少,那麼這個參數的值適當下降一些比較合適。此外,若是發現做業因爲頻繁的gc致使運行緩慢(經過spark web ui能夠觀察到做業的gc耗時),意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。

 

⑦spark.shuffle.memoryFraction

 參數說明:該參數用於設置shuffle過程當中一個task拉取到上個stage的task的輸出後,進行聚合操做時可以使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操做。shuffle操做在進行聚合時,若是發現使用的內存超出了這個20%的限制,那麼多餘的數據就會溢寫到磁盤文件中去,此時就會極大地下降性能。

 參數調優建議:若是Spark做業中的RDD持久化操做較少,shuffle操做較多時,建議下降持久化操做的內存佔比,提升shuffle操做的內存佔比比例,避免shuffle過程當中數據過多時內存不夠用,必須溢寫到磁盤上,下降了性能。此外,若是發現做業因爲頻繁的gc致使運行緩慢,意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。

相關文章
相關標籤/搜索