spark技術總結(1)

1. 請描述spark RDD原理與特徵java

RDD爲Resilient Distributed Datasets縮寫,譯文彈性分佈式數據集。node

他是spark系統中的核心數據模型之一,另一個是DAG模型。git

它是「只讀」,「分區」的數據集合。其類內部有5個部分組成:github

1. 一組partition    partitions_ : Array[Partition]算法

2.每一個partition的計算函數 經過詩選compute函數達到這個目的。sql

3.RDD依賴關係,新的RDD能夠從已有的RDD轉換而來,當RDD中的分區丟失那麼能夠經過這個關係將這些數據從父RDD中從新計算。apache

4.一個分片函數partitioner,模式實現了了hashpartitioner和RangePartitioner。這個函數據定了當前RDD的分片數量,以及父RDD的shuffle輸出時候的分片數量。數組

5.一個列表,保存了各個分區的優先位置,好比HDFS文件來講就保存了每個分區的所在塊的位置。網絡

 

彈性分佈式數據集的字面彈性和分佈式的理解:app

分佈式很好理解,咱們的數據是通常存儲在hdfs分佈式文件系統的,因此RDD也是分佈式的。

彈性能夠理解爲兩層意思:1,數據是可存儲在磁盤或者內存,也能夠部分分區在內存部分分區存儲在磁盤。

            2,RDD的數據是能夠經過父RDD進行恢復,且能夠作到部分分區數據經過依賴關係進行恢復,這稱之爲「血統」。

 

RDD的產生:三種1,已有的scala的數據集合 2,外部存儲系統如s3 hdfs等 3,經過父RDD的轉換而來。

 

2.如何理解spark RDD寬依賴和窄依賴,並結合示例描述

spark從RDD依賴上來講分爲窄依賴和寬依賴。

其中能夠這樣區分是哪一種依賴:當父RDD的一個partition被子RDD的多個partitions引用到的時候則說明是寬依賴,不然爲窄依賴。

寬依賴會觸發shuffe,寬依賴也是一個job鐘不一樣stage的分界線。

本篇文章主要討論一下窄依賴的場景。

 

 3.基於spark-1.6.1版本,請描述spark統一內存管理機制,和各部份內存區域用途。

1.6版本以前使用的叫作StaticMemoryManager 1.6以及以後使用的內存管理機制叫作:UnifiedMemoryManager

相比於以前的內存管理機制,如今統一內存管理機制可讓storage、execution能夠互相從對方借用內存,整體來講解決了如何資源充分利用的問題。

 

上述爲統一內存模型,對相比最主要的區別在於Storage區域與Execution能夠動態分配。

  • 設定基本的存儲內存和執行內存區域(spark.storage.storageFraction 參數),該設定肯定了雙方各自擁有的空間的範圍
  • 雙方的空間都不足時,則存儲到硬盤;若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)
  • 執行內存的空間被對方佔用後,可以讓對方將佔用的部分轉存到硬盤,而後"歸還"借用的空間
  • 存儲內存的空間被對方佔用後,沒法讓對方"歸還",由於須要考慮 Shuffle 過程當中的不少因素,實現起來較爲複雜[4]

 

 

4.spark中的RDD,dataframe和dataset有什麼區別。

 RDD,dataframe和dataset都是spark提供給開發者的三個主要API。

RDD是最底層的數據模型。屬於spark-core包=模塊中。

不管是dataframe仍是dataset其最終都是轉換爲RDD來運行的。

 

而dataframe和dataset則數據spark-sql模塊中。

dataframe則是特殊的dataset

type DataFrame = Dataset[Row]

 

 

其中dataset是強類型的,而dataframe則是無類型的。其中row便是一行帶有列名及列值的數據。

dataframe與dataset  同 RDD比較,主要是解決了元數據問題。RDD不包含元數據。處理數據不夠直觀,代碼閱讀性很差。

而dataframe與dataset則不一樣,帶有元數據,提供特定領域的轉換方法。其內部經過Catalyst模塊來進行執行優化,轉化爲RDD進行計算。

 

以上三種能夠相互轉換。 

 

 

5.請描述spark運行時job,stage,task的劃分原則,以及其相互間的關係。

 我的理解:一個action操做造成一個job,job包含一個或者多個stage,stage包含task組成的taskset。

stage的劃分點實在款依賴的RDD之間。同一個stage中都是窄依賴關係,各個分區能夠並行執行。

task則是相互獨立的分別做用在各個RDD分區之上的一個具體任務。

一個job下只分爲shuffleMapTask和ResultTask。

其中一個job的最後stage爲每個結果的partition執行task爲ResultTask

 

 

6.請描述spark hash shuffle與sort shuffle之間的區別和優劣點。

主要區別就是在於對於小文件的控制上。sort shuffle相比於 hash shuffle在map端產生的小文件更少。

有效下降IO,提升性能。目前sort shuffle是默認shuffle機制了。

hash shuffle在沒有consolidation狀況下將產生M*R個文件。M爲Mapper個數,R爲reducer個數。

以下圖,其每個map最壞的狀況下都將爲每個reducer產生一個獨立文件。

極端狀況下,當數據量比較大,M和R都很大的狀況下會產生大量的小文件。不管在你寫仍是在讀的時候都會有大量的IO消耗。

 

而sort shuffle則將上述hash shuffle基礎上,將一個mapper下產生的不一樣的reducer輸入文件寫到一個文件中,並額外經過一個索引文件去解析她。

這樣M端IO將減小,R端的讀取也將減小,從而降IO提升效率。

sort shuffle的寫以下圖:

 

7.spark計算容錯性如何保證。

Lineage機制和Checkpoint機制來保證。這個兩個機制是創建在RDD是隻讀和分區的概念之上的。

Lineage機制:RDD中分區存在父RDD的分區的依賴關係,當前RDD中的某部分分區數據丟失則能夠經過這個依賴關係進行數據從新計算重構。

窄依賴的依賴關係代價較小,寬依賴則代繳較大,存在冗餘計算。

Checkpoint機制:對於Lineage機制若是Lineage過長或者是存在寬依賴那麼使用重算則須要的代價過大,

Checkpoint機制本質就是講Lineage中的某個RDD寫入磁盤做爲檢查點,後續的子RDD數據丟失則能夠直接在檢查點開始計算,通常把寬依賴上作Checkpoint最爲合適。

 

8.請描述spark廣播變量原理與特色。

1. 廣播變量知足三個特色:

  1.不變,廣播變量是不可變的。

  2. 夠小,由於是放到executor的內存中的,因此不能太大。

  3. 放在executor內存中,在之上的task均可訪問。

2. 原理:

首先在driver中生產廣播變量,而後在應用的全部的executor都會複製這個廣播變量,在這個executor之上的全部task就會公用這個變量。

若是不用廣播變量那麼全部的task就會和RDD(沒有用廣播變量前)進行交互,產生較大的網絡和磁盤IO。

 

 

9.請描述spark yarn-client與yarn-cluster模式原理,並分析其特色。

yarn-client與yarn-cluster的的說法只會當spark on yarn的模式下。

主要區別在於driver(SparkContext)運行在nodemanager的AM上仍是運行在本地(如提交任務的機器上)。

另外誰啓動driver則誰負責調度。換句話說yarn-client的RM在申請資源啓動Executor後並不負責後續的task的調度。

當本地並不須要進行交互計算或者查詢的時候建議使用yarn-cluster模式。

 

 

 

10.請描述spark算子reduce與reduceByKey的區別。

reduce(function):數組中前兩個元素作function操做後的值在和第三個元素值作function操做,以此類推。

val c = sc.parallelize(1 to 10)

c.reduce((x, y) => x + y)//結果55

解釋爲:

1+2 = 3

3+3=6

6+4=10

10+5=15

15+6=21

21+7=28

28+8=36

36+9=45

45+10=55

        

 

reduceByKey(function):是對元素爲KV對的RDD中Key相同的元素的Value進行binary_function的reduce操做,

所以,Key相同的多個元素的值被reduce爲一個值,而後與原RDD中的Key組成一個新的KV對。

如:

val a = sc.parallelize(List((1,2),(1,3),(1,1),(3,4),(3,6),(3,5)))

a.reduceByKey((x,y) => x + y).collect

解釋爲:

  key 爲 1 的value相加:2+3+1 = 6  造成 (1,6)

  key爲 3的value相加:4+6+5 = 15 造成(3,15)

 

 

 

11.spark中任務調度的調度模式有幾種,分別描述實現原理

Spark中整體的任務調度以下圖:

 

spark目前支持兩種調度策略:

  SchedulingMode.FIFO 先入先出調度策略和 SchedulingMode.FAIR 公平調度策略 ,他們都是application級別的策略。

  1. SchedulingMode.FIFO :保證jobid較小的先調度,若是是同一個job,那麼stage的id小的先調度。 

   

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    res < 0
  }
}

  2. SchedulingMode.FAIR 公平調度策略 :rootpool下掛在的是一組pool,葉子節點是tasksetmanager,首先是肯定rootpool的子pool的調度順序,而後在子pool內部使用相同的

算法肯定調度順序。

  

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

    var compare = 0
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }
    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}

   

12.spark on yarn模式下,spark executor oom時候,請列舉哪些能夠解決或者調優,請具體描述。

 1. 減小數據膨脹:

  a. 使用數組和基本類型來減小數據膨脹。

  b. 避免過多的小對象的嵌套

  c. 若是RAM 小於32 GB則能夠在spark-env.sh中設置-XX:+UseCompressedOops參數,來將指針地址長度從8位變爲4位。

2. 保持executor上的task的數據本地性。

  當數據非本地的時候將會遠程從其餘executor上加載數據到內存,當數據過大可能引發oom,咱們能夠調大spark.locality.wait

3.防止過大的partation。

  可使用repartition對RDD從新分區。

4.GC調優

  使用G1 回收策略等

5. 使用spark.rdd.compress 指定爲KryoSerializer

  能夠進一步壓縮在內存中的數據,而且比java自帶的要更加高效。

6.適當的調整spark.memory.storageFraction

  避免過多的借還。

 

13.請分析如下異常發生緣由,並概述spark監聽器機制。

Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.

緣由:

val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}
上述當發生隊列容量不足的時候將會發生drop事件從而出現此日誌信息。


 

14.舉例分析spark數據傾斜現象,緣由和解決方法。

15.spark streaming中Dstream是如何產生,請描述過程。

16.spark streaming與kafka集成應用時,從kafka讀取數據的幾種獲取方式,並各有什麼不一樣。

17.請描述spark thriftServer增量(配置參數spark.sql.thriftServer.incrementalCollect)取數機制。

18.以「select count(*) from table」語句爲例子,描述spark task產生過程。

19.請說明spark-sql產生小文件的緣由,請描述其調優解決方法。

20.請舉例spark經常使用優化參數,請闡述其意義和應用場景。

相關文章
相關標籤/搜索