上萬字詳解Spark Core(建議收藏)

🧡先來一個問題,也是面試中常問的:node

Spark爲何會流行?面試

緣由1:優秀的數據模型和豐富計算抽象算法

Spark 產生以前,已經有MapReduce這類很是成熟的計算系統存在了,並提供了高層次的API(map/reduce),把計算運行在集羣中並提供容錯能力,從而實現分佈式計算。編程

雖然MapReduce提供了對數據訪問和計算的抽象,可是對於數據的複用就是簡單的將中間數據寫到一個穩定的文件系統中(例如HDFS),因此會產生數據的複製備份,磁盤的I/O以及數據的序列化,因此在遇到須要在多個計算之間複用中間結果的操做時效率就會很是的低。而這類操做是很是常見的,例如迭代式計算,交互式數據挖掘,圖計算等。api

認識到這個問題後,學術界的 AMPLab 提出了一個新的模型,叫作 RDD。RDD 是一個能夠容錯且並行的數據結構(其實能夠理解成分佈式的集合,操做起來和操做本地集合同樣簡單),它可讓用戶顯式的將中間結果數據集保存在內存中,而且經過控制數據集的分區來達到數據存放處理最優化.同時 RDD也提供了豐富的 API (map、reduce、filter、foreach、redeceByKey...)來操做數據集。後來 RDD被 AMPLab 在一個叫作 Spark 的框架中提供並開源。數組

簡而言之,Spark 借鑑了 MapReduce 思想發展而來,保留了其分佈式並行計算的優勢並改進了其明顯的缺陷。讓中間數據存儲在內存中提升了運行速度、並提供豐富的操做數據的API提升了開發速度。緩存

緣由2:完善的生態圈-fullstack安全

目前,Spark已經發展成爲一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目。數據結構

Spark Core:實現了 Spark 的基本功能,包含RDD、任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。框架

Spark SQL:Spark 用來操做結構化數據的程序包。經過 Spark SQL,咱們可使用 SQL操做數據。

Spark Streaming:Spark 提供的對實時數據進行流式計算的組件。提供了用來操做數據流的 API。

Spark MLlib:提供常見的機器學習(ML)功能的程序庫。包括分類、迴歸、聚類、協同過濾等,還提供了模型評估、數據導入等額外的支持功能。

GraphX(圖計算):Spark中用於圖計算的API,性能良好,擁有豐富的功能和運算符,能在海量數據上自如地運行復雜的圖算法。

集羣管理器:Spark 設計爲能夠高效地在一個計算節點到數千個計算節點之間伸縮計算。

StructuredStreaming:處理結構化流,統一了離線和實時的API。

Spark VS Hadoop

Hadoop Spark
類型 基礎平臺, 包含計算, 存儲, 調度 分佈式計算工具
場景 大規模數據集上的批處理 迭代計算, 交互式計算, 流計算
價格 對機器要求低, 便宜 對內存有要求, 相對較貴
編程範式 Map+Reduce, API 較爲底層, 算法適應性差 RDD組成DAG有向無環圖, API 較爲頂層, 方便使用
數據存儲結構 MapReduce中間計算結果存在HDFS磁盤上, 延遲大 RDD中間運算結果存在內存中 , 延遲小
運行方式 Task以進程方式維護, 任務啓動慢 Task以線程方式維護, 任務啓動快

💖注意:
儘管Spark相對於Hadoop而言具備較大優點,但Spark並不能徹底替代Hadoop,Spark主要用於替代Hadoop中的MapReduce計算模型。存儲依然可使用HDFS,可是中間結果能夠存放在內存中;調度可使用Spark內置的,也可使用更成熟的調度系統YARN等。
實際上,Spark已經很好地融入了Hadoop生態圈,併成爲其中的重要一員,它能夠藉助於YARN實現資源調度管理,藉助於HDFS實現分佈式存儲。
此外,Hadoop可使用廉價的、異構的機器來作分佈式存儲與計算,可是,Spark對硬件的要求稍高一些,對內存與CPU有必定的要求。

Spark Core

1、RDD詳解

1. 爲何要有RDD?

在許多迭代式算法(好比機器學習、圖算法等)和交互式數據挖掘中,不一樣計算階段之間會重用中間結果,即一個階段的輸出結果會做爲下一個階段的輸入。可是,以前的MapReduce框架採用非循環式的數據流模型,把中間結果寫入到HDFS中,帶來了大量的數據複製、磁盤IO和序列化開銷。且這些框架只能支持一些特定的計算模式(map/reduce),並無提供一種通用的數據抽象。

AMP實驗室發表的一篇關於RDD的論文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》就是爲了解決這些問題的。

RDD提供了一個抽象的數據模型,讓咱們沒必要擔憂底層數據的分佈式特性,只需將具體的應用邏輯表達爲一系列轉換操做(函數),不一樣RDD之間的轉換操做之間還能夠造成依賴關係,進而實現管道化,從而避免了中間結果的存儲,大大下降了數據複製、磁盤IO和序列化開銷,而且還提供了更多的API(map/reduec/filter/groupBy...)。

2. RDD是什麼?

RDD(Resilient Distributed Dataset)叫作彈性分佈式數據集,是Spark中最基本的數據抽象,表明一個不可變、可分區、裏面的元素可並行計算的集合。
單詞拆解:

  • Resilient :它是彈性的,RDD裏面的中的數據能夠保存在內存中或者磁盤裏面
  • Distributed :它裏面的元素是分佈式存儲的,能夠用於分佈式計算
  • Dataset: 它是一個集合,能夠存放不少元素

3. RDD主要屬性

進入RDD的源碼中看下:

RDD源碼

在源碼中能夠看到有對RDD介紹的註釋,咱們來翻譯下:

  1. A list of partitions
    一組分片(Partition)/一個分區(Partition)列表,即數據集的基本組成單位。
    對於RDD來講,每一個分片都會被一個計算任務處理,分片數決定並行度。
    用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。

  2. A function for computing each split
    一個函數會被做用在每個分區。
    Spark中RDD的計算是以分片爲單位的,compute函數會被做用到每一個分區上。

  3. A list of dependencies on other RDDs
    一個RDD會依賴於其餘多個RDD。
    RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。(Spark的容錯機制)

  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    可選項,對於KV類型的RDD會有一個Partitioner,即RDD的分區函數,默認爲HashPartitioner。

  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    可選項,一個列表,存儲存取每一個Partition的優先位置(preferred location)。
    對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照"移動數據不如移動計算"的理念,Spark在進行任務調度的時候,會盡量選擇那些存有數據的worker節點來進行任務計算。

總結

RDD 是一個數據集的表示,不只表示了數據集,還表示了這個數據集從哪來,如何計算,主要屬性包括:

  1. 分區列表
  2. 計算函數
  3. 依賴關係
  4. 分區函數(默認是hash)
  5. 最佳位置

分區列表、分區函數、最佳位置,這三個屬性其實說的就是數據集在哪,在哪計算更合適,如何分區;
計算函數、依賴關係,這兩個屬性其實說的是數據集怎麼來的。

2、RDD-API

1. RDD的建立方式

  1. 由外部存儲系統的數據集建立,包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等:
    val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

  2. 經過已有的RDD通過算子轉換生成新的RDD:
    val rdd2=rdd1.flatMap(_.split(" "))

  3. 由一個已經存在的Scala集合建立:
    val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    或者
    val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

makeRDD方法底層調用了parallelize方法:

RDD源碼

2. RDD的算子分類

RDD的算子分爲兩類:

  1. Transformation轉換操做:返回一個新的RDD
  2. Action動做操做:返回值不是RDD(無返回值或返回其餘的)

❣️注意:
一、RDD不實際存儲真正要計算的數據,而是記錄了數據的位置在哪裏,數據的轉換關係(調用了什麼方法,傳入什麼函數)。
二、RDD中的全部轉換都是惰性求值/延遲執行的,也就是說並不會直接計算。只有當發生一個要求返回結果給Driver的Action動做時,這些轉換纔會真正運行。
三、之因此使用惰性求值/延遲執行,是由於這樣能夠在Action時對RDD操做造成DAG有向無環圖進行Stage的劃分和並行優化,這種設計讓Spark更加有效率地運行。

3. Transformation轉換算子

轉換算子 含義
map(func) 返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成
filter(func) 返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成
flatMap(func) 相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)
mapPartitions(func) 相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 對源RDD和參數RDD求並集後返回一個新的RDD
intersection(otherDataset) 對源RDD和參數RDD求交集後返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 對PairRDD中相同的Key值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值。和aggregate函數相似,aggregateByKey返回值的類型不須要和RDD中value的類型一致
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey相似,可是更靈活
join(otherDataset, [numTasks]) 在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable ,Iterable ))類型的RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars]) 對rdd進行管道操做
coalesce(numPartitions) 減小 RDD 的分區數到指定值。在過濾大量數據以後,能夠執行此操做
repartition(numPartitions) 從新給 RDD 分區

4. Action動做算子

動做算子 含義
reduce(func) 經過func函數彙集RDD中的全部元素,這個功能必須是可交換且可並聯的
collect() 在驅動程序中,以數組的形式返回數據集的全部元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(相似於take(1))
take(n) 返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed]) 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering]) 返回天然順序或者自定義順序的前 n 個元素
saveAsTextFile(path) 將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統
saveAsObjectFile(path) 將數據集的元素,以 Java 序列化的方式保存到指定的目錄下
countByKey() 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數
foreach(func) 在數據集的每個元素上,運行函數func進行更新
foreachPartition(func) 在數據集的每個分區上,運行函數func

統計操做:

算子 含義
count 個數
mean 均值
sum 求和
max 最大值
min 最小值
variance 方差
sampleVariance 從採樣中計算方差
stdev 標準差:衡量數據的離散程度
sampleStdev 採樣的標準差
stats 查看統計結果

3、RDD的持久化/緩存

在實際開發中某些RDD的計算或轉換可能會比較耗費時間,若是這些RDD後續還會頻繁的被使用到,那麼能夠將這些RDD進行持久化/緩存,這樣下次再使用到的時候就不用再從新計算了,提升了程序運行的效率。

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //緩存/持久化
rdd2.sortBy(_._2,false).collect//觸發action,會去讀取HDFS的文件,rdd2會真正執行持久化
rdd2.sortBy(_._2,false).collect//觸發action,會去讀緩存中的數據,執行速度會比以前快,由於rdd2已經持久化到內存中了

持久化/緩存API詳解

  • ersist方法和cache方法

RDD經過persist或cache方法能夠將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
經過查看RDD的源碼發現cache最終也是調用了persist無參方法(默認存儲只存在內存中):

RDD源碼

  • 存儲級別

默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

持久化級別 說明
MORY_ONLY(默認) 將RDD以非序列化的Java對象存儲在JVM中。 若是沒有足夠的內存存儲RDD,則某些分區將不會被緩存,每次須要時都會從新計算。 這是默認級別
MORY_AND_DISK(開發中可使用這個) 將RDD以非序列化的Java對象存儲在JVM中。若是數據在內存中放不下,則溢寫到磁盤上.須要時則會從磁盤上讀取
MEMORY_ONLY_SER (Java and Scala) 將RDD以序列化的Java對象(每一個分區一個字節數組)的方式存儲.這一般比非序列化對象(deserialized objects)更具空間效率,特別是在使用快速序列化的狀況下,可是這種方式讀取數據會消耗更多的CPU
MEMORY_AND_DISK_SER (Java and Scala) 與MEMORY_ONLY_SER相似,但若是數據在內存中放不下,則溢寫到磁盤上,而不是每次須要從新計算它們
DISK_ONLY 將RDD分區存儲在磁盤上
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 與上面的儲存級別相同,只不過將持久化數據存爲兩份,備份每一個分區存儲在兩個集羣節點上
OFF_HEAP(實驗中) 與MEMORY_ONLY_SER相似,但將數據存儲在堆外內存中。 (即不是直接存儲在JVM內存中)

總結:

  1. RDD持久化/緩存的目的是爲了提升後續操做的速度
  2. 緩存的級別有不少,默認只存在內存中,開發中使用memory_and_disk
  3. 只有執行action操做的時候纔會真正將RDD數據進行持久化/緩存
  4. 實際開發中若是某一個RDD後續會被頻繁的使用,能夠將該RDD進行持久化/緩存

4、RDD容錯機制Checkpoint

  • 持久化的侷限:

持久化/緩存能夠把數據放在內存中,雖然是快速的,可是也是最不可靠的;也能夠把數據放在磁盤上,也不是徹底可靠的!例如磁盤會損壞等。

  • 問題解決:

Checkpoint的產生就是爲了更加可靠的數據持久化,在Checkpoint的時候通常把數據放在在HDFS上,這就自然的藉助了HDFS天生的高容錯、高可靠來實現數據最大程度上的安全,實現了RDD的容錯和高可用。

用法

SparkContext.setCheckpointDir("目錄") //HDFS的目錄

RDD.checkpoint
  • 總結:

  • 開發中如何保證數據的安全性性及讀取效率:
    能夠對頻繁使用且重要的數據,先作緩存/持久化,再作checkpint操做。

  • 持久化和Checkpoint的區別:

  1. 位置:
    Persist 和 Cache 只能保存在本地的磁盤和內存中(或者堆外內存--實驗中)
    Checkpoint 能夠保存數據到 HDFS 這類可靠的存儲上。

  2. 生命週期:
    Cache和Persist的RDD會在程序結束後會被清除或者手動調用unpersist方法
    Checkpoint的RDD在程序結束後依然存在,不會被刪除。

5、RDD依賴關係

1. 寬窄依賴

  • 兩種依賴關係類型
    RDD和它依賴的父RDD的關係有兩種不一樣的類型,即
    寬依賴(wide dependency/shuffle dependency)
    窄依賴(narrow dependency)

  • 圖解:

寬窄依賴

  • 如何區分寬窄依賴:

窄依賴:父RDD的一個分區只會被子RDD的一個分區依賴;
寬依賴:父RDD的一個分區會被子RDD的多個分區依賴(涉及到shuffle)。

2. 爲何要設計寬窄依賴

  1. 對於窄依賴:

窄依賴的多個分區能夠並行計算;
窄依賴的一個分區的數據若是丟失只須要從新計算對應的分區的數據就能夠了。

  1. 對於寬依賴:

劃分Stage(階段)的依據:對於寬依賴,必須等到上一階段計算完成才能計算下一階段。

6、DAG的生成和劃分Stage

1. DAG介紹

  • DAG是什麼:

DAG(Directed Acyclic Graph有向無環圖)指的是數據轉換執行的過程,有方向,無閉環(其實就是RDD執行的流程);
原始的RDD經過一系列的轉換操做就造成了DAG有向無環圖,任務執行時,能夠按照DAG的描述,執行真正的計算(數據被操做的一個過程)。

  • DAG的邊界

開始:經過SparkContext建立的RDD;
結束:觸發Action,一旦觸發Action就造成了一個完整的DAG。

2.DAG劃分Stage

DAG劃分Stage

一個Spark程序能夠有多個DAG(有幾個Action,就有幾個DAG,上圖最後只有一個Action(圖中未表現),那麼就是一個DAG)

一個DAG能夠有多個Stage(根據寬依賴/shuffle進行劃分)。

同一個Stage能夠有多個Task並行執行(task數=分區數,如上圖,Stage1 中有三個分區P一、P二、P3,對應的也有三個 Task)。

能夠看到這個DAG中只reduceByKey操做是一個寬依賴,Spark內核會以此爲邊界將其先後劃分紅不一樣的Stage。

同時咱們能夠注意到,在圖中Stage1中,從textFile到flatMap到map都是窄依賴,這幾步操做能夠造成一個流水線操做,經過flatMap操做生成的partition能夠不用等待整個RDD計算結束,而是繼續進行map操做,這樣大大提升了計算的效率

  • 爲何要劃分Stage? --並行計算

一個複雜的業務邏輯若是有shuffle,那麼就意味着前面階段產生結果後,才能執行下一個階段,即下一個階段的計算要依賴上一個階段的數據。那麼咱們按照shuffle進行劃分(也就是按照寬依賴就行劃分),就能夠將一個DAG劃分紅多個Stage/階段,在同一個Stage中,會有多個算子操做,能夠造成一個pipeline流水線,流水線內的多個平行的分區能夠並行執行。

  • 如何劃分DAG的stage?

對於窄依賴,partition的轉換處理在stage中完成計算,不劃分(將窄依賴儘可能放在在同一個stage中,能夠實現流水線計算)。

對於寬依賴,因爲有shuffle的存在,只能在父RDD處理完成後,才能開始接下來的計算,也就是說須要要劃分stage。

總結:

Spark會根據shuffle/寬依賴使用回溯算法來對DAG進行Stage劃分,從後往前,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到當前的stage/階段中

具體的劃分算法請參見AMP實驗室發表的論文:
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se


文章推薦
Spark底層執行原理詳細解析

相關文章
相關標籤/搜索