Spark Streaming源碼解析之DAG定義

此文是從思惟導圖中導出稍做調整後生成的,思惟腦圖對代碼瀏覽支持不是很好,爲了更好閱讀體驗,文中涉及到的源碼都是刪除掉沒必要要的代碼後的僞代碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀:git

此博文共分爲四個部分:github

  1. imgDAG定義
  2. imgJob動態生成
  3. img數據的產生與導入
  4. img容錯

img

img

1. DStream

img

1.1. RDD

img

DStream和RDD關係:app

DStream is a continuous sequence of RDDs:
generatedRDDs=new HashMap[Time,RDD[T]]()

1.1.1. 存儲

存儲格式socket

DStream內部經過一個HashMap的變量generatedRDD來記錄生成的RDD:ide

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

其中 :函數

- key: time是生成當前batch的時間戳oop

- value: 生成的RDD實例this

每個不一樣的 DStream 實例,都有一個本身的 generatedRDD,即每一個轉換操做的結果都會保留spa

1.1.2. 獲取

img

1.1.2.1. getOrCompute

img

  1. 從rdd的map中獲取:generatedRDDs.get(time).orElsescala

  2. map中沒有則計算:val newRDD=compute(time)

  3. 將計算的newRDD放入map中:generatedRDDs.put(time, newRDD)

其中compute方法有如下特色:

  • 不一樣DStream的計算方式不一樣

  • inputStream會對接對應數據源的API

  • transformStream會從父依賴中去獲取RDD並進行轉換得新的DStream

compute方法實現:

class ReceiverInputDStream{

 override def compute(validTime: Time): Option[RDD[T]] = {

    val blockRDD = {
 
      if (validTime < graph.startTime) {

        
        // If this is called for any time before the start time of the context,

        // then this returns an empty RDD. This may happen when recovering from a

        // driver failure without any write ahead log to recover pre-failure data.

        new BlockRDD[T](ssc.sc, Array.empty)

      } else {

        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream

        // for this batch

        val receiverTracker = ssc.scheduler.receiverTracker

        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

 
        // Register the input blocks information into InputInfoTracker

        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)

        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

 
        // Create the BlockRDD

        createBlockRDD(validTime, blockInfos)

      }

    }

    Some(blockRDD)

  }

1.1.3. 生成

RDD主要分爲如下三個過程:InputStream -> TransFormationStream -> OutputStream

img

1.1.3.1. InputStream

inputstream包括FileInputStream,KafkaInputStream等等

img

1.1.3.1.1. FileInputStream

FileInputStream的生成步驟:

img

  1. 找到新產生的文件:val newFiles = findNewFiles(validTime.milliseconds)

  2. 將newFiles轉換爲RDDs:val rdds=filesToRDD(newFiles)

2.1. 遍歷文件列表獲取生成RDD: val fileRDDs=files.map(file=>newAPIHadoop(file))

2.2. 將每一個文件的RDD進行合併並返回:return new UnionRDD(fileRDDs)

  1. 返回生成的rdds

1.1.3.2. TransformationStream

img

RDD的轉換實現:

  1. 獲取parent DStream:val parentDs=parent.getOrCompute(validTime)
  2. 執行轉換函數並返回轉換結果:return parentDs.map(mapFunc)

轉換類的DStream實現特色

  • 傳入parent DStream和轉換函數

  • compute方法中從parent DStream中獲取DStream並對其做用轉換函數

private[streaming]

class MappedDStream[T: ClassTag, U: ClassTag] (
   parent: DStream[T],
   mapFunc: T => U
 ) extends DStream[U](parent.ssc) {

 override def dependencies: List[DStream[_]] = List(parent)
 override def slideDuration: Duration = parent.slideDuration
 override def compute(validTime: Time): Option[RDD[U]] = {
   parent.getOrCompute(validTime).map(_.map[U](mapFunc))
 }
}

不一樣DStream的getOrCompute方法實現:

  • FilteredDStream:parent.getOrCompute(validTime).map(_.filter(filterFunc)
  • FlatMapValuedDStream:parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)
  • MappedDStream:parent.getOrCompute(validTime).map(_.map[U](mapFunc))

在最開始, DStream 的 transformation 的 API 設計與 RDD 的 transformation 設計保持了一致,就使得,每個 dStreamA.transformation() 獲得的新 dStreamB 能將 dStreamA.transformation() 操做完美複製爲每一個 batch 的 rddA.transformation() 操做。這也就是 DStream 可以做爲 RDD 模板,在每一個 batch 裏實例化 RDD 的根本緣由。

1.1.3.3. OutputDStream

OutputDStream的操做最後都轉換到ForEachDStream(),ForeachDStream中會生成Job並返回。

僞代碼

def generateJob(time:Time){
	val jobFunc=()=>crateRDD{
		foreachFunc(rdd,time)
	}
	Some(new Job(time,jobFunc))
}

源碼

private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)
  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time) {
          ssc.sparkContext.setCallSite(creationSite)
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
        case None => None
    }
  }
}

經過對output stream節點進行遍歷,就能夠獲得全部上游依賴的DStream,直至找到沒有父依賴的inputStream。

1.2. 特徵

DStream基本屬性:

  • 父依賴: dependencies: List[DStream[_]]

  • 時間間隔:slideDuration:Duration

  • 生成RDD的函數:compute

1.3. 實現類

DStream的實現類可分爲三種:輸入,轉換和輸出

img

DStream之間的轉換相似於RDD之間的轉換,對於wordCount的例子,實現代碼:

val lines=ssc.socketTextStream(ip,port)
val worlds=lines.flatMap(_.split("_"))
val pairs=words.map(word=>(word,1))
val wordCounts=pairs.reduceByKey(_+_)
wordCounts.print()

每一個函數的返回對象用具體實現代替:

val lines=new SocketInputDStream(ip,port)
val words=new FlatMappedDStream(lines,_.split("_"))
val pairs=new MappedDStream(words,word=>(word,1))
val wordCounts=new ShuffledDStream(pairs,_+_)
new ForeachDStream(wordCounts,cnt=>cnt.print())

1.3.1. ForeachDStream

DStream的實現分爲兩種,transformation和output

不一樣的轉換操做有其對應的DStream實現,全部的output操做只對應於ForeachDStream

1.3.2. Transformed DStream

img

1.3.3. InputDStream

img

2. DStreamGraph

img

2.1 DAG分類

  • 邏輯DAG: 經過transformation操做正向生成

  • 物理DAG: 惰性求值的緣由,在遇到output操做時根據dependency逆向寬度優先遍歷求值。

2.2 DAG生成

DStreamGraph屬性

inputStreams=new ArrayBuffer[InputDStream[_]]()
 outputStreams=new ArrayBuffer[DStream[_]]()

DAG實現過程

​ 經過對output stream節點進行遍歷,就能夠獲得全部上游依賴的DStream,直至找到沒有父依賴的inputStream。

​ sparkStreaming 記錄整個DStream DAG的方式就是經過一個DStreamGraph 實例記錄了到全部output stream節點的引用

generateJobs

def generateJobs(time: Time): Seq[Job] = {
      val jobs = this.synchronized {
      outputStreams.flatMap { 
		outputStream =>
        val jobOption = 
			// 調用了foreachDStream來生成每一個job
			outputStream.generateJob(time)   jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
	// 返回生成的Job列表
    jobs
  }

腦圖製做參考https://github.com/lw-lin/CoolplaySpark

完整腦圖連接地址https://sustblog.oss-cn-beijing.aliyuncs.com/blog/2018/spark/srccode/spark-streaming-all.png

相關文章
相關標籤/搜索