本期內容:微信
1,DStream與RDD關係完全研究app
2,Streaming中RDD的生成完全研究框架
RDD是怎麼生成的?RDD依靠什麼生成?RDD生成的依據是什麼?Spark Streaming中RDD的執行是否和Spark Core中的RDD執行有所不一樣?運行以後咱們對RDD怎麼處理?socket
RDD自己也是基本的對象,例如說BatchInterval爲1秒,那麼每一秒都會產生RDD,內存中不能徹底容納該對象。每一個BatchInterval的做業執行完後,怎麼對已有的RDD進行管理。ide
ForEachDStream不必定會觸發Job的執行,和Job的執行沒有關係。函數
Job的產生是由Spark Streaming框架形成的。oop
foreachRDD是Spark Streaming的後門,能夠直接對RDD進行操做。post
DStream就是RDD的模板,後面的DStream與前面的DStream有依賴。大數據
val lines = jsc.socketTextStream("127.0.0.1", 9999)這裏產生了SocketInputDStream。ui
lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).print()這裏由SocketInputDStream轉換爲FlatMappedDStream,再轉換爲MappedDStream,再轉換爲ShuffledDStream,再轉換爲ForEachDStream。
對於DStream類,源碼中是這樣解釋的。
* DStreams internally is characterized by a few basic properties: * - A list of other DStreams that the DStream depends on * - A time interval at which the DStream generates an RDD * - A function that is used to generate an RDD after each time interval |
大體意思是:
1.DStream依賴於其餘DStream。
2.每隔BatchDuration,DStream生成一個RDD
3.每隔BatchDuration,DStream內部函數會生成RDD
DStream是從後往前依賴,由於DStream表明Spark Streaming業務邏輯,RDD是從後往前依賴的,DStream是lazy級別的。DStream的依賴關係必須和RDD的依賴關係保持高度一致。
DStream類中generatedRDDs存儲着不一樣時間對應的RDD實例。每個DStream實例都有本身的generatedRDDs。實際運算的時候,因爲是從後往前推,計算只做用於最後一個DStream。
// RDDs generated, marked as private[streaming] so that testsuites can access it @transient private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () |

generatedRDDs是如何獲取的。DStream的getOrCompute方法,先根據時間判斷HashMap中是否已存在該時間對應的RDD,若是沒有則調用compute獲得RDD,並放入到HashMap中。
/** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } } |
拿DStream的子類ReceiverInputDStream來講明compute方法,內部調用了createBlockRDD這個方法。
/** * Generates RDDs with blocks received by the receiver of this stream. */ override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) } |
createBlockRDD會返回BlockRDD,因爲ReceiverInputDStream沒有父依賴,因此本身生成RDD。
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { if (blockInfos.nonEmpty) { val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } if (areWALRecordHandlesPresent) { // If all the blocks have WAL record handle, then create a WALBackedBlockRDD val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not // others then that is unexpected and log a warning accordingly. if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logError("Some blocks do not have Write Ahead Log information; " + "this is unexpected and data may not be recoverable after driver failures") } else { logWarning("Some blocks have Write Ahead Log information; this is unexpected") } } val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) } if (validBlockIds.size != blockIds.size) { logWarning("Some blocks could not be recovered as they were not found in memory. " + "To prevent such data loss, enabled Write Ahead Log (see programming guide " + "for more details.") } new BlockRDD[T](ssc.sc, validBlockIds) } } else { // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD // according to the configuration if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, Array.empty, Array.empty, Array.empty) } else { new BlockRDD[T](ssc.sc, Array.empty) } } } |
再拿DStream的子類MappedDStream來講,這裏的compute方法,是調用父RDD的getOrCompute方法得到RDD,再使用map操做。
private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } } |
從上面兩個DStream的子類,能夠說明第一個DStream,即InputDStream的comput方法是本身獲取數據並計算的,而其餘的DStream是依賴父DStream的,調用父DStream的getOrCompute方法,而後進行計算。
以上說明了對DStream的操做最後做用於對RDD的操做。
接着看下DStream的另外一個子類ForEachDStream,發現其compute方法沒有任何操做,可是重寫了generateJob方法。
private[streaming] class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean ) extends DStream[Unit](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } } |
從Job生成入手,JobGenerator的generateJobs方法,內部調用的DStreamGraph的generateJobs方法。
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { //根據特定的時間獲取具體的數據 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //調用DStreamGraph的generateJobs生成Job graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) } |
DStreamGraph的generateJobs方法調用了OutputStream的generateJob方法,OutputStream就是ForEachDStream。
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } |
總結:DStream是RDD的模板,其內部generatedRDDs 保存了每一個BatchDuration時間生成的RDD對象實例。DStream的依賴構成了RDD依賴關係,即從後往前計算時,只要對最後一個DStream計算便可。JobGenerator每隔BatchDuration調用DStreamGraph的generateJobs方法,調用了ForEachDStream的generateJob方法,其內部先調用父DStream的getOrCompute方法來獲取RDD,而後在進行計算,從後往前推,第一個DStream是ReceiverInputDStream,其comput方法中從receiverTracker中獲取對應時間段的metadata信息,而後生成BlockRDD對象,並放入到generatedRDDs中。
備註:
一、DT大數據夢工廠微信公衆號DT_Spark 二、IMF晚8點大數據實戰YY直播頻道號:68917580 三、新浪微博: http://www.weibo.com/ilovepains