RDD的依賴關係分爲兩種:窄依賴(Narrow Dependencies)與寬依賴(Wide Dependencies,源碼中稱爲Shuffle Dependencies)數據庫
依賴有2個做用,其一用來解決數據容錯的高效性;其二用來劃分stage。數組
窄依賴:每一個父RDD的一個Partition最多被子RDD的一個Partition所使用(1:1 或 n:1)。例如map、filter、union等操做都會產生窄依賴;緩存
子RDD分區一般對應常數個父RDD分區(O(1),與數據規模無關。網絡
寬依賴:一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操做都會產生寬依賴;(1:m 或 n:m)框架
(子RDD分區一般對應全部的父RDD分區(O(n),與數據規模有關)分佈式
相比於寬依賴,窄依賴對優化頗有利 ,主要基於如下兩點:ide
1、寬依賴每每對應着shuffle操做,須要在運行過程當中將同一個父RDD的分區傳入到不一樣的子RDD分區中,中間可能涉及多個節點之間的數據傳輸;而窄依賴的每一個父RDD的分區只會傳入到一個子RDD分區中,一般能夠在一個節點內完成轉換。函數
2、當RDD分區丟失時(某個節點故障),spark會對數據進行重算。oop
Ø 對於窄依賴,因爲父RDD的一個分區只對應一個子RDD分區,這樣只須要重算和子RDD分區對應的父RDD分區便可,因此這個重算對數據的利用率是100%的;性能
Ø 對於寬依賴,重算的父RDD分區對應多個子RDD分區,這樣實際上父RDD 中只有一部分的數據是被用於恢復這個丟失的子RDD分區的,另外一部分對應子RDD的其它未丟失分區,這就形成了多餘的計算;更通常的,寬依賴中子RDD分區一般來自多個父RDD分區,極端狀況下,全部的父RDD分區都要進行從新計算。
Ø 以下圖所示,b1分區丟失,則須要從新計算a1,a2和a3,這就產生了冗餘計算(a1,a2,a3中對應b2的數據)。
區分這兩種依賴頗有用。首先,窄依賴容許在一個集羣節點上以流水線的方式(pipeline)計算全部父分區。例如,逐個元素地執行map、而後filter操做;而寬依賴則須要首先計算好全部父分區數據,而後在節點之間進行Shuffle,這與MapReduce相似。第二,窄依賴可以更有效地進行失效節點的恢復,即只需從新計算丟失RDD分區的父分區,並且不一樣節點之間能夠並行計算;而對於一個寬依賴關係的Lineage圖,單個節點失效可能致使這個RDD的全部祖先丟失部分分區,於是須要總體從新計算。
【誤解】以前一直理解錯了,覺得窄依賴中每一個子RDD可能對應多個父RDD,當子RDD丟失時會致使多個父RDD進行從新計算,因此窄依賴不如寬依賴有優點。而實際上應該深刻到分區級別去看待這個問題,並且重算的效用也不在於算的多少,而在於有多少是冗餘的計算。窄依賴中須要重算的都是必須的,因此重算不冗餘。
窄依賴的函數有:map、filter、union、join(父RDD是hash-partitioned )、mapPartitions、mapValues
寬依賴的函數有:groupByKey、join(父RDD不是hash-partitioned )、partitionBy
依賴的繼承關係:
val rdd1 = sc.parallelize(1 to 10, 1)
val rdd2 = sc.parallelize(11 to 20, 1)
val rdd3 = rdd1.union(rdd2)
rdd3.dependencies.size
// 長度爲2,值爲rdd1、rdd2,意爲rdd3依賴rdd1、rdd2
rdd3.dependencies
// 結果:
rdd3.dependencies(0).rdd.collect
// 打印rdd1的數據
rdd3.dependencies(1).rdd.collect
// 打印rdd2的數據
rdd3.dependencies(3).rdd.collect
// 數組越界,報錯
哪些RDD Actions對應shuffleDependency?下面的join(r5)好像就沒有shuffleDependency
val r1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"))
val r2 = r1.keyBy(_.length)
val r3 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"))
val r4 = r3.keyBy(_.length)
val r5 = r2.join(r4)
回答:join不必定會有shuffleDependency,上面的操做中就沒有。
redueceByKey會產生shuffleDependency。
注意上面操做中的keyBy,和個人想象不太同樣。要注意一下。
keyBy:與map操做較爲相似,給每一個元素增長了一個key
如下這個例子有點意思:
val r1 = sc.textFile("hdfs:///user/hadoop/data/block_test1.csv")
r1
val r2 = r1.dependencies(0).rdd
r2.partitions.size
r2.preferredLocations(r2.partitions(0))
r2.preferredLocations(r2.partitions(3))
有意思的地方在於(查找依賴、優先位置):
1、r1的類型爲MapPartitionsRDD
2、r1依賴於r2,若是沒有這個賦值語句是看不出來的。r2的類型爲:HadoopRDD
3、能夠檢索r2各個分區的位置,該hdfs文件系統的副本數設置爲2
通常來講,分佈式數據集的容錯性有兩種方式:數據檢查點和記錄數據的更新(CheckPoint Data,和Logging The Updates)。
面向大規模數據分析,數據檢查點操做成本很高,須要經過數據中心的網絡鏈接在機器之間複製龐大的數據集,而網絡帶寬每每比內存帶寬低得多,同時還須要消耗更多的存儲資源。
所以,Spark選擇記錄更新的方式。可是,若是更新粒度太細太多,那麼記錄更新成本也不低。所以,RDD只支持粗粒度轉換,即只記錄單個塊上執行的單個操做(記錄如何從其餘RDD轉換而來,即lineage),而後將建立RDD的一系列變換序列(每一個RDD都包含了他是如何由其餘RDD變換過來的以及如何重建某一塊數據的信息。所以RDD的容錯機制又稱「血統(Lineage)」容錯)記錄下來,以便恢復丟失的分區。
Lineage本質上很相似於數據庫中的重作日誌(Redo Log),只不過這個重作日誌粒度很大,是對全局數據作一樣的重作進而恢復數據。
Lineage容錯原理:在容錯機制中,若是一個節點死機了,並且運算窄依賴,則只要把丟失的父RDD分區重算便可,不依賴於其餘節點。而寬依賴須要父RDD的全部分區都存在,重算就很昂貴了。能夠這樣理解開銷的經濟與否:在窄依賴中,在子RDD的分區丟失、重算父RDD分區時,父RDD相應分區的全部數據都是子RDD分區的數據,並不存在冗餘計算。在寬依賴狀況下,丟失一個子RDD分區重算的每一個父RDD的每一個分區的全部數據並非都給丟失的子RDD分區用的,會有一部分數據至關於對應的是未丟失的子RDD分區中須要的數據,這樣就會產生冗餘計算開銷,這也是寬依賴開銷更大的緣由。所以若是使用Checkpoint算子來作檢查點,不只要考慮Lineage是否足夠長,也要考慮是否有寬依賴,對寬依賴加Checkpoint是最物有所值的。
Checkpoint機制。在如下2種狀況下,RDD須要加檢查點:
Ø DAG中的Lineage過長,若是重算,則開銷太大(如在屢次迭代中)
Ø 在寬依賴上作Checkpoint得到的收益更大
因爲RDD是隻讀的,因此Spark的RDD計算中一致性不是主要關心的內容,內存相對容易管理,這也是設計者頗有遠見的地方,這樣減小了框架的複雜性,提高了性能和可擴展性,爲之後上層框架的豐富奠基了強有力的基礎。
在RDD計算中,經過檢查點機制進行容錯,傳統作檢查點有兩種方式:經過冗餘數據和日誌記錄更新操做。在RDD中的doCheckPoint方法至關於經過冗餘數據來緩存數據,而以前介紹的血統就是經過至關粗粒度的記錄更新操做來實現容錯的。
檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage作容錯的輔助,Lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。
1、從本質上說:checkpoint是容錯機制;cache是優化機制
2、checkpoint將數據寫到共享存儲中(hdfs);cache一般是內存中
3、運算時間很長或運算量太大才能獲得的 RDD,computing chain 過長或依賴其餘 RDD 不少的RDD,須要作checkpoint。會被重複使用的(但不能太大)RDD,作cache。
實際上,將 ShuffleMapTask 的輸出結果存放到本地磁盤也算是 checkpoint,只不過這個checkpoint 的主要目的是去 partition 輸出數據。
4、RDD 的checkpoint 操做完成後會斬斷lineage,cache操做對lineage沒有影響。
checkpoint 在 Spark Streaming中特別重要,spark streaming 中對於一些有狀態的操做,這在某些 stateful 轉換中是須要的,在這種轉換中,生成 RDD 須要依賴前面的 batches,會致使依賴鏈隨着時間而變長。爲了不這種沒有盡頭的變長,要按期將中間生成的 RDDs 保存到可靠存儲來切斷依賴鏈,必須隔一段時間進行一次checkpoint。
cache 和 checkpoint 是有顯著區別的,緩存把 RDD 計算出來而後放在內存中, 可是RDD 的依賴鏈(至關於數據庫中的redo 日誌),也不能丟掉,當某個點某個 executor 宕了,上面cache 的RDD就會丟掉,須要經過依賴鏈重放計算出來,不一樣的是,checkpoint 是把 RDD 保存在 HDFS中,是多副本可靠存儲,因此依賴鏈就能夠丟掉了,即斬斷了依賴鏈,是經過複製實現的高容錯。
注意:checkpoint須要把 job 從新從頭算一遍,最好先cache一下,checkpoint就能夠直接保存緩存中的 RDD 了,就不須要重頭計算一遍了,對性能有極大的提高。
checkpoint 的正確使用姿式
val data = sc.textFile("/tmp/spark/1.data").cache() // 注意要cache
sc.setCheckpointDir("/tmp/spark/checkpoint")
data.checkpoint
data.count
//問題:cache和checkpoint有沒有前後的問題;有了cache能夠避免第二次計算,我在代碼中能夠看見相關的說明!!!
使用很簡單, 就是設置一下 checkpoint 目錄,而後再rdd上調用 checkpoint 方法, action 的時候就對數據進行了 checkpoint
checkpoint 寫流程
RDD checkpoint 過程當中會通過如下幾個狀態,
[ Initialized –> marked for checkpointing –> checkpointing in progress –> checkpointed ]