Spark DAG概述

1、 DAG定義

DAG每一個節點表明啥?表明的一個RDD緩存

這裏再次複習RDD的5大特性網絡

  • 一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。
  • 一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。
  • RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
  • 一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
  • 一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。

2.張澤立併發

  • 一個RDD生成兩個RDD:

        RDD2 = RDD1.filter(xxxxx)ide

        RDD3 = RDD1.filter(yyyy)函數

       是從RDD1到RDD2,RDD3這樣的過程spa

  • Union是兩個RDD合併成一個的過程   

        則是RDD2 RDD3變成RDD4的過程線程

  • filter/map/reduceByKey 應該都是一條直線

        是從RDD4到RDD5這樣的過程scala

        上述都是transformation設計

        RDD5.collect();  //action3d

        RDD5.foreach();  //action

    這種則會生成兩個job,會順序提交,前一個job執行結束以後纔會提交下一個job(假設上述代碼都在一個線程中)

RDD依賴關係

RDD依賴關係,也就是有依賴的RDD之間的關係,好比RDD1------->RDD2(RDD1生成RDD2),RDD2依賴於RDD1。這裏的生成也就是RDDtransformation操做

窄依賴(也叫narrow依賴)

從父RDD角度看:一個父RDD只被一個子RDD分區使用。父RDD的每一個分區最多隻能被一個Child RDD的一個分區使用

從子RDD角度看:依賴上級RDD的部分分區     精確知道依賴的上級RDD分區,會選擇和本身在同一節點的上級RDD分區,沒有網絡IO開銷,高效。如map,flatmap,filter

寬依賴(也叫shuffle依賴/wide依賴)

從父RDD角度看:一個父RDD被多個子RDD分區使用。父RDD的每一個分區能夠被多個Child RDD分區依賴

從子RDD角度看:依賴上級RDD的全部分區     沒法精肯定位依賴的上級RDD分區,至關於依賴全部分區(例如reduceByKey)  計算就涉及到節點間網絡傳輸  

張澤立

                        父分區,都只有一根箭頭                         父分區,都有多個箭頭

                        子分區,來自部分父分區                         子分區,來自所有父分區

Spark之因此將依賴分爲narrow和 shuffle:

(1) narrow dependencies能夠支持在同一個集羣Executor上,以pipeline管道形式順序執行多條命令,例如在執行了map後,緊接着執行filter。分區內的計算收斂,不須要依賴全部分區的數據,能夠並行地在不一樣節點進行計算。因此它的失敗恢復也更有效,由於它只須要從新計算丟失的parent partition便可,

(2)shuffle dependencies 則須要全部的父分區都是可用的,必須等RDD的parent partition數據所有ready以後才能開始計算,可能還須要調用相似MapReduce之類的操做進行跨節點傳遞。從失敗恢復的角度看,shuffle dependencies 牽涉RDD各級的多個parent partition。
 

如圖所示,左邊的都是右邊的父分區

劃分stage

因爲shuffle依賴必須等RDD的parent RDD partition數據所有ready以後才能開始計算,所以spark的設計是讓parent RDD將結果寫在本地,徹底寫完以後,通知後面的RDD。後面的RDD則首先去讀以前的本地數據做爲input,而後進行運算。

因爲上述特性,將shuffle依賴就必須分爲兩個階段(stage)去作:

第一個階段(stage)須要把結果shuffle到本地,例如reduceByKey,首先要聚合某個key的全部記錄,才能進行下一步的reduce計算,這個匯聚的過程就是shuffle

第二個階段(stage)則讀入數據進行處理。

同一個stage裏面的task是能夠併發執行的,下一個stage要等前一個stage ready

(和mapreduce的reduce須要等map過程ready 一脈相承)

(爲何要寫在本地:後面的RDD多個partition都要去讀這個信息,若是放到內存,若是出現數據丟失,後面的全部步驟所有不能進行,違背了以前所說的須要parent RDD partition數據所有ready的原則。爲何要保證parent RDD要ready,以下例,若是有一個partition未生成或者在內存中丟失,那麼直接致使計算結果是徹底錯誤的:

寫到文件中更加可靠。Shuffle會生成大量臨時文件,以避免錯誤時從新計算,其使用的本地磁盤目錄由spark.local.dir指定,緩存到磁盤的RDD數據。最好將這個屬性設定爲訪問速度快的本地磁盤。能夠配置多個路徑到多個磁盤,增長IO帶寬

在Spark 1.0 之後,SPARK_LOCAL_DIRS(Standalone, Mesos) or LOCAL_DIRS (YARN)參數會覆蓋這個配置。好比Spark On YARN的時候,Spark Executor的本地路徑依賴於Yarn的配置,而不取決於這個參數。)

對於transformation操做,以shuffle依賴爲分隔,分爲不一樣的Stages。

窄依賴------>tasks會歸併在同一個stage中,(相同節點上的task運算能夠像pipeline同樣順序執行,不一樣節點並行計算,互不影響)

shuffle依賴------>先後拆分爲兩個stage,前一個stage寫完文件後下一個stage才能開始

action操做------>和其餘tasks會歸併在同一個stage(在沒有shuffle依賴的狀況下,生成默認的stage,保證至少一個stage)

小實驗驗證

例一:

def main(args: Array[String]): Unit = {
    val sp = new SparkConf();
    sp.setAppName("zhangzeli")
    sp.setMaster("local")
    val sc = new SparkContext(sp);
    val rdd =sc.parallelize(Array(1,2,3,4));//由於分的資源是兩個核,因此默認設置爲兩個partition
    val cont = rdd.count();
    while (true){}

  }

Count是一個action操做。一個action會觸發一個job,Count()這個action在整個job沒有stage的狀況下會生成一個默認的stage

結果:一個job,一個stage,兩個task(由於有兩個partition)

例二:

最終這個生成一個job,由於reducebykey是shuffle依賴,因此這裏劃分爲兩個stage

parallelize和map被分在一塊兒,爲stage0,map最後進行了ShuffleWrite

reduceByKey和count()被劃分到一個stage1裏面了,最開始要進行shuffle read
Stage0的tasks以下圖,兩個partitions(兩個tasks)都進行了shuffle write。兩個task互相獨立,並不須要依賴彼此作完或者怎樣,因此他們在一個stage裏面併發執行

Stage1的tasks以下:Stage1是依賴以前的stage0完成shuffle的,reduceByKey開始須要ShuffleRead stage0的計算結果

若是後面還有其餘操做,這些操做是要等上面這個shuffle執行完的 reduceByKey 則在下一階段,shuffleRead讀到數據 因此根據shuffle依賴必須分爲多個stage 但一個stage內部,多個task(partition)是獨立併發執行的,互不打擾

相關文章
相關標籤/搜索