此文是從思惟導圖中導出稍做調整後生成的,思惟腦圖對代碼瀏覽支持不是很好,爲了更好閱讀體驗,文中涉及到的源碼都是刪除掉沒必要要的代碼後的僞代碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀:git
此博文共分爲四個部分:github
DStream和RDD關係:app
DStream is a continuous sequence of RDDs: generatedRDDs=new HashMap[Time,RDD[T]]()
存儲格式socket
DStream內部經過一個HashMap的變量generatedRDD來記錄生成的RDD:ide
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
其中 :函數
- key: time是生成當前batch的時間戳oop
- value: 生成的RDD實例this
每個不一樣的 DStream 實例,都有一個本身的 generatedRDD,即每一個轉換操做的結果都會保留spa
從rdd的map中獲取:generatedRDDs.get(time).orElsescala
map中沒有則計算:val newRDD=compute(time)
將計算的newRDD放入map中:generatedRDDs.put(time, newRDD)
其中compute方法有如下特色:
不一樣DStream的計算方式不一樣
inputStream會對接對應數據源的API
transformStream會從父依賴中去獲取RDD並進行轉換得新的DStream
compute方法實現:
class ReceiverInputDStream{ override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
RDD主要分爲如下三個過程:InputStream -> TransFormationStream -> OutputStream
inputstream包括FileInputStream,KafkaInputStream等等
FileInputStream的生成步驟:
找到新產生的文件:val newFiles = findNewFiles(validTime.milliseconds)
將newFiles轉換爲RDDs:val rdds=filesToRDD(newFiles)
2.1. 遍歷文件列表獲取生成RDD: val fileRDDs=files.map(file=>newAPIHadoop(file))
2.2. 將每一個文件的RDD進行合併並返回:return new UnionRDD(fileRDDs)
RDD的轉換實現:
轉換類的DStream實現特色:
傳入parent DStream和轉換函數
compute方法中從parent DStream中獲取DStream並對其做用轉換函數
private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } }
不一樣DStream的getOrCompute方法實現:
parent.getOrCompute(validTime).map(_.filter(filterFunc)
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
在最開始, DStream 的 transformation 的 API 設計與 RDD 的 transformation 設計保持了一致,就使得,每個 dStreamA.transformation() 獲得的新 dStreamB 能將 dStreamA.transformation() 操做完美複製爲每一個 batch 的 rddA.transformation() 操做。這也就是 DStream 可以做爲 RDD 模板,在每一個 batch 裏實例化 RDD 的根本緣由。
OutputDStream的操做最後都轉換到ForEachDStream(),ForeachDStream中會生成Job並返回。
僞代碼
def generateJob(time:Time){ val jobFunc=()=>crateRDD{ foreachFunc(rdd,time) } Some(new Job(time,jobFunc)) }
源碼
private[streaming] class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time) { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } }
經過對output stream節點進行遍歷,就能夠獲得全部上游依賴的DStream,直至找到沒有父依賴的inputStream。
DStream基本屬性:
父依賴: dependencies: List[DStream[_]]
時間間隔:slideDuration:Duration
生成RDD的函數:compute
DStream的實現類可分爲三種:輸入,轉換和輸出
DStream之間的轉換相似於RDD之間的轉換,對於wordCount的例子,實現代碼:
val lines=ssc.socketTextStream(ip,port) val worlds=lines.flatMap(_.split("_")) val pairs=words.map(word=>(word,1)) val wordCounts=pairs.reduceByKey(_+_) wordCounts.print()
每一個函數的返回對象用具體實現代替:
val lines=new SocketInputDStream(ip,port) val words=new FlatMappedDStream(lines,_.split("_")) val pairs=new MappedDStream(words,word=>(word,1)) val wordCounts=new ShuffledDStream(pairs,_+_) new ForeachDStream(wordCounts,cnt=>cnt.print())
DStream的實現分爲兩種,transformation和output
不一樣的轉換操做有其對應的DStream實現,全部的output操做只對應於ForeachDStream
邏輯DAG: 經過transformation操做正向生成
物理DAG: 惰性求值的緣由,在遇到output操做時根據dependency逆向寬度優先遍歷求值。
DStreamGraph屬性
inputStreams=new ArrayBuffer[InputDStream[_]]() outputStreams=new ArrayBuffer[DStream[_]]()
DAG實現過程
經過對output stream節點進行遍歷,就能夠獲得全部上游依賴的DStream,直至找到沒有父依賴的inputStream。
sparkStreaming 記錄整個DStream DAG的方式就是經過一個DStreamGraph 實例記錄了到全部output stream節點的引用
generateJobs
def generateJobs(time: Time): Seq[Job] = { val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = // 調用了foreachDStream來生成每一個job outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } // 返回生成的Job列表 jobs }
腦圖製做參考:https://github.com/lw-lin/CoolplaySpark
完整腦圖連接地址:https://sustblog.oss-cn-beijing.aliyuncs.com/blog/2018/spark/srccode/spark-streaming-all.png