大數據計算平臺Spark內核全面解讀

1Spark介紹html

Spark是起源於美國加州大學伯克利分校AMPLab的大數據計算平臺,在2010年開源,目前是Apache軟件基金會的頂級項目。隨着Spark在大數據計算領域的暫露頭角,愈來愈多的企業開始關注和使用。201411月,SparkDaytona Gray Sort 100TB Benchmark競賽中打破了由Hadoop MapReduce保持的排序記錄。Spark利用1/10的節點數,100TB數據的排序時間從72分鐘提升到了23分鐘算法

Spark在架構上包括內核部分和4個官方子模塊--Spark SQLSpark Streaming、機器學習庫MLlib和圖計算庫GraphX。圖1所示爲Spark在伯克利的數據分析軟件棧BDASBerkeley Data Analytics Stack)中的位置。可見Spark專一於數據的計算,而數據的存儲在生產環境中每每仍是由Hadoop分佈式文件系統HDFS承擔。shell

1 SparkBDAS中的位置 apache

Spark被設計成支持多場景的通用大數據計算平臺,它能夠解決大數據計算中的批處理,交互查詢及流式計算等核心問題。Spark能夠從多數據源的讀取數據,而且擁有不斷髮展的機器學習庫和圖計算庫供開發者使用。數據和計算在Spark內核及Spark的子模塊中是打通的,這就意味着Spark內核和子模塊之間成爲一個總體。Spark的各個子模塊以Spark內核爲基礎,進一步支持更多的計算場景,例如使用Spark SQL讀入的數據能夠做爲機器學習庫MLlib的輸入。表1列舉了一些在Spark平臺上的計算場景。數組

1 Spark的應用場景舉例緩存

在本文寫做是,Spark的最新版本爲1.2.0,文中的示例代碼也來自於這個版本。網絡

2Spark內核介紹 架構

相信大數據工程師都很是瞭解Hadoop MapReduce一個最大的問題是在不少應用場景中速度很是慢,只適合離線的計算任務。這是因爲MapReduce須要將任務劃分紅mapreduce兩個階段,map階段產生的中間結果要寫回磁盤,而在這兩個階段之間須要進行shuffle操做。Shuffle操做須要從網絡中的各個節點進行數據拷貝,使其每每成爲最爲耗時的步驟,這也是Hadoop MapReduce慢的根本緣由之一,大量的時間耗費在網絡磁盤IO中而不是用於計算。在一些特定的計算場景中,例如像邏輯迴歸這樣的迭代式的計算,MapReduce的弊端會顯得更加明顯。機器學習

Spark是若是設計分佈式計算的呢?首先咱們須要理解Spark中最重要的概念--彈性分佈數據集(Resilient Distributed Dataset),也就是RDD。 分佈式

2.1 彈性分佈數據集RDD

RDDSpark中對數據和計算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可變的並可以被並行操做的數據集合。對RDD的操做分爲兩種transformationactionTransformation操做是經過轉換從一個或多個RDD生成新的RDDAction操做是從RDD生成最後的計算結果。在Spark最新的版本中,提供豐富的transformationaction操做,比起MapReduce計算模型中僅有的兩種操做,會大大簡化程序開發的難度。

RDD的生成方式只有兩種,一是從數據源讀入,另外一種就是從其它RDD經過transformation操做轉換。一個典型的Spark程序就是經過Spark上下文環境(SparkContext)生成一個或多個RDD,在這些RDD上經過一系列的transformation操做生成最終的RDD,最後經過調用最終RDDaction方法輸出結果。

每一個RDD均可以用下面5個特性來表示,其中後兩個爲可選的:

  • 分片列表(數據塊列表)

  • 計算每一個分片的函數

  • 對父RDD的依賴列表

  • key-value類型的RDD的分片器(Partitioner)(可選)

  • 每一個數據分片的預約義地址列表(如HDFS上的數據塊的地址)(可選)

雖然Spark是基於內存的計算,但RDD不光能夠存儲在內存中,根據useDiskuseMemoryuseOffHeap, deserializedreplication五個參數的組合Spark提供了12種存儲級別,在後面介紹RDD的容錯機制時,咱們會進一步理解。值得注意的是當StorageLevel設置成OFF_HEAP時,RDD實際被保存到Tachyon中。Tachyon是一個基於內存的分佈式文件系統,目前正在快速發展,本文不作詳細介紹,能夠經過其官方網站進一步瞭解。

  1. class StorageLevel private(

  2.     private var _useDisk: Boolean,

  3.     private var _useMemory: Boolean,

  4.     private var _useOffHeap: Boolean,

  5.     private var _deserialized: Boolean

  6.     private var _replication: Int = 1)

  7.   extends Externalizable { //… }

  8.  

  9. val NONE = new StorageLevel(false, false, false, false)

  10.   val DISK_ONLY = new StorageLevel(true, false, false, false)

  11.   val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

  12.   val MEMORY_ONLY = new StorageLevel(false, true, false, true)

  13.   val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

  14.   val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

  15.   val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

  16.   val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

  17.   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

  18.   val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

  19.   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

  20.   val OFF_HEAP = new StorageLevel(false, false, true, false)

2.2 DAGStage與任務的生成

Spark的計算髮生在RDDaction操做,而對action以前的全部transformationSpark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。

Spark內核會在須要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是DAG。舉個例子,在圖2中,從輸入中邏輯上生成AC兩個RDD,通過一系列transformation操做,邏輯上生成了F,注意,咱們說的是邏輯上,由於這時候計算沒有發生,Spark內核作的事情只是記錄了RDD的生成和依賴關係。當F要進行輸出時,也就是F進行了action操做,Spark會根據RDD的依賴生成DAG,並從起點開始真正的計算。

2 邏輯上的計算過程:DAG 

有了計算的DAG圖,Spark內核下一步的任務就是根據DAG圖將計算劃分紅任務集,也就是Stage,這樣能夠將任務提交到計算節點進行真正的計算。Spark計算的中間結果默認是保存在內存中的,Spark在劃分Stage的時候會充分考慮在分佈式計算中可流水線計算(pipeline)的部分來提升計算的效率,而在這個過程當中,主要的根據就是RDD的依賴類型。根據不一樣的transformation操做,RDD的依賴能夠分爲窄依賴(Narrow Dependency)和寬依賴(Wide Dependency,在代碼中爲ShuffleDependency)兩種類型。窄依賴指的是生成的RDD中每一個partition只依賴於父RDD(s) 固定的partition。寬依賴指的是生成的RDD的每個partition都依賴於父 RDD(s) 全部partition。窄依賴典型的操做有map, filter, union等,寬依賴典型的操做有groupByKey, sortByKey等。能夠看到,寬依賴每每意味着shuffle操做,這也是Spark劃分stage的主要邊界。對於窄依賴,Spark會將其儘可能劃分在同一個stage中,由於它們能夠進行流水線計算。

3 RDD的寬依賴和窄依賴

咱們再經過圖4詳細解釋一下Spark中的Stage劃分。咱們從HDFS中讀入數據生成3個不一樣的RDD,經過一系列transformation操做後再將計算結果保存回HDFS。能夠看到這幅DAG中只有join操做是一個寬依賴,Spark內核會以此爲邊界將其先後劃分紅不一樣的Stage. 同時咱們能夠注意到,在圖中Stage2中,從mapunion都是窄依賴,這兩步操做能夠造成一個流水線操做,經過map操做生成的partition能夠不用等待整個RDD計算結束,而是繼續進行union操做,這樣大大提升了計算的效率。

4 Spark中的Stage劃分 

Spark在運行時會把Stage包裝成任務提交,有父StageSpark會先提交父Stage。弄清楚了Spark劃分計算的原理,咱們再結合源碼看一看這其中的過程。下面的代碼是DAGScheduler中的獲得一個RDDStage的函數,能夠看到寬依賴爲劃分Stage的邊界。

  1. /**

  2.    * Get or create the list of parent stages for a given RDD. The stages will be assigned the

  3.    * provided jobId if they haven't already been created with a lower jobId.

  4.    */

  5.  

  6.   private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {

  7.     val parents = new HashSet[Stage]

  8.     val visited = new HashSet[RDD[_]]

  9.     // We are manually maintaining a stack here to prevent StackOverflowError

  10.     // caused by recursively visiting

  11.     val waitingForVisit = new Stack[RDD[_]]

  12.     def visit(r: RDD[_]) {

  13.       if (!visited(r)) {

  14.         visited += r

  15.         // Kind of ugly: need to register RDDs with the cache here since

  16.         // we can't do it in its constructor because # of partitions is unknown

  17.         for (dep <- r.dependencies) {

  18.           dep match {

  19.             case shufDep: ShuffleDependency[_, _, _] =>

  20.               parents += getShuffleMapStage(shufDep, jobId)

  21.             case _ =>

  22.               waitingForVisit.push(dep.rdd)

  23.           }

  24.         }

  25.       }

  26.     }

  27.  

  28.     waitingForVisit.push(rdd)

  29.     while (!waitingForVisit.isEmpty) {

  30.       visit(waitingForVisit.pop())

  31.     }

  32.     parents.toList

  33.   }

上面提到Spark的計算是從RDD調用action操做時候觸發的,咱們來看一個action的代碼

RDDcollect方法是一個action操做,做用是將RDD中的數據返回到一個數組中。能夠看到,在此action中,會觸發Spark上下文環境SparkContext中的runJob方法,這是一系列計算的起點。

  1. abstract class RDD[T: ClassTag](

  2.     @transient private var sc: SparkContext,

  3.     @transient private var deps: Seq[Dependency[_]]

  4.   ) extends Serializable with Logging {

  5.   //….

  6. /**

  7.    * Return an array that contains all of the elements in this RDD.

  8.    */

  9.   def collect(): Array[T] = {

  10.     val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

  11.     Array.concat(results: _*)

  12.   }

  13. }

SparkContext擁有DAGScheduler的實例,在runJob方法中會進一步調用DAGSchedulerrunJob方法。在此時,DAGScheduler會生成DAGStage,將Stage提交給TaskSchedulerTaskSchdulerStage包裝成TaskSet,發送到Worker節點進行真正的計算,同時還要監測任務狀態,重試失敗和長時間無返回的任務。整個過程如圖5所示。

 

5 Spark中任務的生成 

2.3 RDD的緩存與容錯

上文提到,Spark的計算是從action開始觸發的,若是在action操做以前邏輯上不少transformation操做,一旦中間發生計算失敗,Spark會從新提交任務,這在不少場景中代價過大。還有一些場景,若有些迭代算法,計算的中間結果會被重複使用,重複計算一樣增長計算時間和形成資源浪費。所以,在提升計算效率和更好支持容錯,Spark提供了基於RDDcache機制和checkpoint機制。

咱們能夠經過RDDtoDebugString來查看其遞歸的依賴信息,圖6展現了在spark shell中經過調用這個函數來查看wordCount RDD的依賴關係,也就是它的Lineage.

6 RDD wordCountlineage 

若是發現Lineage過長或者裏面有被屢次重複使用的RDD,咱們就能夠考慮使用cache機制或checkpoint機制了。

咱們能夠經過在程序中直接調用RDDcache方法將其保存在內存中,這樣這個RDD就能夠被多個任務共享,避免重複計算。另外,RDD還提供了更爲靈活的persist方法,能夠指定存儲級別。從源碼中能夠看到RDD.cache就是簡單的調用了RDD.persist(StorageLevel.MEMORY_ONLY)

  1. /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

  2.   def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  3.   def cache(): this.type = persist()

一樣,咱們能夠調用RDDcheckpoint方法將其保存到磁盤。咱們須要在SparkContext中設置checkpoint的目錄,不然調用會拋出異常。值得注意的是,在調用checkpoint以前建議先調用cache方法將RDD放入內存,不然將RDD保存到文件的時候須要從新計算。 

  1.   /**

  2.    * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint

  3.    * directory set with SparkContext.setCheckpointDir() and all references to its parent

  4.    * RDDs will be removed. This function must be called before any job has been

  5.    * executed on this RDD. It is strongly recommended that this RDD is persisted in

  6.    * memory, otherwise saving it on a file will require recomputation.

  7.    */

  8.   def checkpoint() {

  9.     if (context.checkpointDir.isEmpty) {

  10.       throw new SparkException("Checkpoint directory has not been set in the SparkContext")

  11.     } else if (checkpointData.isEmpty) {

  12.       checkpointData = Some(new RDDCheckpointData(this))

  13.       checkpointData.get.markForCheckpoint()

  14.     }

  15.   }

Cache機制和checkpoint機制的差異在於cacheRDD保存到內存,並保留Lineage,若是緩存失效RDD還能夠經過Lineage重建。而checkpointRDD落地到磁盤並切斷Lineage,由文件系統保證其重建。

2.4 Spark任務的部署

Spark的集羣部署分爲StandaloneMesosYarn三種模式,咱們以Standalone模式爲例,簡單介紹Spark程序的部署。如圖7示,集羣中的Spark程序運行時分爲3種角色,driver, masterworkerslave)。在集羣啓動前,首先要配置masterworker節點。啓動集羣后,worker節點會向master節點註冊本身,master節點會維護worker節點的心跳。Spark程序都須要先建立Spark上下文環境,也就是SparkContext。建立SparkContext的進程就成爲了driver角色,上一節提到的DAGSchedulerTaskScheduler都在driver中運行。Spark程序在提交時要指定master的地址,這樣能夠在程序啓動時向master申請worker的計算資源。Drivermasterworker之間的通訊由Akka支持。Akka 也使用 Scala 編寫,用於構建可容錯的、高可伸縮性的Actor 模型應用。關於Akka,能夠訪問其官方網站進行進一步瞭解,本文不作詳細介紹。

7 Spark任務部署

3、更深一步瞭解Spark內核

瞭解了Spark內核的基本概念和實現後,更深一步理解其工做原理的最好方法就是閱讀源碼。最新的Spark源碼能夠從Spark官方網站下載。源碼推薦使用IntelliJ IDEA閱讀,會自動安裝Scala插件。讀者能夠從core工程,也就是Spark內核工程開始閱讀,更能夠設置斷點嘗試跟蹤一個任務的執行。另外,讀者還能夠經過分析Spark的日誌來進一步理解Spark的運行機制,Spark使用log4j記錄日誌,能夠在啓動集羣前修改log4j的配置文件來配置日誌輸出和格式。

【編輯推薦】

  1. Spark:利用Eclipse構建Spark集成開發環境

  2. Spark實戰:單節點本地模式搭建Spark運行環境

  3. Spark:爲大數據處理點亮一盞明燈

  4. 專訪Spark亞太研究院王家林:從技術的角度探索Spark

  5. StormSpark:誰纔是咱們的實時處理利器

相關文章
相關標籤/搜索