DAG每一個節點表明啥?表明的一個RDD緩存
這裏再次複習RDD的5大特性網絡
2.併發
RDD2 = RDD1.filter(xxxxx)ide
RDD3 = RDD1.filter(yyyy)函數
是從RDD1到RDD2,RDD3這樣的過程spa
則是RDD2 RDD3變成RDD4的過程線程
是從RDD4到RDD5這樣的過程scala
上述都是transformation設計
RDD5.collect(); //action3d
RDD5.foreach(); //action
這種則會生成兩個job,會順序提交,前一個job執行結束以後纔會提交下一個job(假設上述代碼都在一個線程中)
RDD依賴關係,也就是有依賴的RDD之間的關係,好比RDD1------->RDD2(RDD1生成RDD2),RDD2依賴於RDD1。這裏的生成也就是RDDtransformation操做
窄依賴(也叫narrow依賴)
從父RDD角度看:一個父RDD只被一個子RDD分區使用。父RDD的每一個分區最多隻能被一個Child RDD的一個分區使用
從子RDD角度看:依賴上級RDD的部分分區 精確知道依賴的上級RDD分區,會選擇和本身在同一節點的上級RDD分區,沒有網絡IO開銷,高效。如map,flatmap,filter
寬依賴(也叫shuffle依賴/wide依賴)
從父RDD角度看:一個父RDD被多個子RDD分區使用。父RDD的每一個分區能夠被多個Child RDD分區依賴
從子RDD角度看:依賴上級RDD的全部分區 沒法精肯定位依賴的上級RDD分區,至關於依賴全部分區(例如reduceByKey) 計算就涉及到節點間網絡傳輸
父分區,都只有一根箭頭 父分區,都有多個箭頭
子分區,來自部分父分區 子分區,來自所有父分區
Spark之因此將依賴分爲narrow和 shuffle:
(1) narrow dependencies能夠支持在同一個集羣Executor上,以pipeline管道形式順序執行多條命令,例如在執行了map後,緊接着執行filter。分區內的計算收斂,不須要依賴全部分區的數據,能夠並行地在不一樣節點進行計算。因此它的失敗恢復也更有效,由於它只須要從新計算丟失的parent partition便可,
(2)shuffle dependencies 則須要全部的父分區都是可用的,必須等RDD的parent partition數據所有ready以後才能開始計算,可能還須要調用相似MapReduce之類的操做進行跨節點傳遞。從失敗恢復的角度看,shuffle dependencies 牽涉RDD各級的多個parent partition。
如圖所示,左邊的都是右邊的父分區
因爲shuffle依賴必須等RDD的parent RDD partition數據所有ready以後才能開始計算,所以spark的設計是讓parent RDD將結果寫在本地,徹底寫完以後,通知後面的RDD。後面的RDD則首先去讀以前的本地數據做爲input,而後進行運算。
因爲上述特性,將shuffle依賴就必須分爲兩個階段(stage)去作:
第一個階段(stage)須要把結果shuffle到本地,例如reduceByKey,首先要聚合某個key的全部記錄,才能進行下一步的reduce計算,這個匯聚的過程就是shuffle
第二個階段(stage)則讀入數據進行處理。
同一個stage裏面的task是能夠併發執行的,下一個stage要等前一個stage ready
(和mapreduce的reduce須要等map過程ready 一脈相承)
(爲何要寫在本地:後面的RDD多個partition都要去讀這個信息,若是放到內存,若是出現數據丟失,後面的全部步驟所有不能進行,違背了以前所說的須要parent RDD partition數據所有ready的原則。爲何要保證parent RDD要ready,以下例,若是有一個partition未生成或者在內存中丟失,那麼直接致使計算結果是徹底錯誤的:
寫到文件中更加可靠。Shuffle會生成大量臨時文件,以避免錯誤時從新計算,其使用的本地磁盤目錄由spark.local.dir指定,緩存到磁盤的RDD數據。最好將這個屬性設定爲訪問速度快的本地磁盤。能夠配置多個路徑到多個磁盤,增長IO帶寬
在Spark 1.0 之後,SPARK_LOCAL_DIRS(Standalone, Mesos) or LOCAL_DIRS (YARN)參數會覆蓋這個配置。好比Spark On YARN的時候,Spark Executor的本地路徑依賴於Yarn的配置,而不取決於這個參數。)
對於transformation操做,以shuffle依賴爲分隔,分爲不一樣的Stages。
窄依賴------>tasks會歸併在同一個stage中,(相同節點上的task運算能夠像pipeline同樣順序執行,不一樣節點並行計算,互不影響)
shuffle依賴------>先後拆分爲兩個stage,前一個stage寫完文件後下一個stage才能開始
action操做------>和其餘tasks會歸併在同一個stage(在沒有shuffle依賴的狀況下,生成默認的stage,保證至少一個stage)
例一:
def main(args: Array[String]): Unit = { val sp = new SparkConf(); sp.setAppName("zhangzeli") sp.setMaster("local") val sc = new SparkContext(sp); val rdd =sc.parallelize(Array(1,2,3,4));//由於分的資源是兩個核,因此默認設置爲兩個partition val cont = rdd.count(); while (true){} }
Count是一個action操做。一個action會觸發一個job,Count()這個action在整個job沒有stage的狀況下會生成一個默認的stage
結果:一個job,一個stage,兩個task(由於有兩個partition)
例二:
最終這個生成一個job,由於reducebykey是shuffle依賴,因此這裏劃分爲兩個stage
parallelize和map被分在一塊兒,爲stage0,map最後進行了ShuffleWrite
reduceByKey和count()被劃分到一個stage1裏面了,最開始要進行shuffle read
Stage0的tasks以下圖,兩個partitions(兩個tasks)都進行了shuffle write。兩個task互相獨立,並不須要依賴彼此作完或者怎樣,因此他們在一個stage裏面併發執行
Stage1的tasks以下:Stage1是依賴以前的stage0完成shuffle的,reduceByKey開始須要ShuffleRead stage0的計算結果
若是後面還有其餘操做,這些操做是要等上面這個shuffle執行完的 reduceByKey 則在下一階段,shuffleRead讀到數據 因此根據shuffle依賴必須分爲多個stage 但一個stage內部,多個task(partition)是獨立併發執行的,互不打擾