本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
JobGenerator週期性的不斷產生Job,最終Job會在Executor上執行處理。緩存
咱們能夠看到receivedBlockTracker包含在ReceiverTracker,最重要的是receivedBlockTracker內部維護了一個 streamIdToUnallocatedBlockQueues,用於追蹤Executor上報上來的Block。架構
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
private val receivedBlockTracker = new ReceivedBlockTracker(
ssc.sparkContext.conf,
ssc.sparkContext.hadoopConfiguration,
receiverInputStreamIds,
ssc.scheduler.clock,
ssc.isCheckpointPresent,
Option(ssc.checkpointDir)
)
複製代碼
receivedBlockTracker內部重要的元數據存儲結構:app
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
複製代碼
JobScheduler裏面包含核心的重量級成員,分別是:jobGenerator 和 receiverTracker。其中初始化以下:框架
private val jobGenerator = new JobGenerator(this)
receiverTracker = new ReceiverTracker(ssc)
複製代碼
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
複製代碼
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
複製代碼
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}
複製代碼
JobGenerator持有jobScheduler的引用,jobScheduler持有receiverTracker的引用ide
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
複製代碼
receiverTracker持有receivedBlockTracker的引用函數
從streamIdToUnallocatedBlockQueues取出streamId對應的全部間隔爲200ms(default)採集的block,並把它放到timeToAllocatedBlocks中。oop
* Allocate all unallocated blocks to the given batch.
* This event will get written to the write ahead log (if enabled).
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
(首先按照用戶設置的時間窗,從streamIdToUnallocatedBlockQueues取出全部的Block)
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap <=點睛之筆
(而後把未分配用戶指定時間窗的block放進timeToAllocatedBlocks)
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) <=點睛之筆
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
}
複製代碼
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD) <=點睛之筆
}
rddOption
} else {
None
}
}
}
複製代碼
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
}
複製代碼
* Generates RDDs with blocks received by the receiver of this stream.
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
(主要從timeToAllocatedBlocks中取出數據)
val receiverTracker = ssc.scheduler.receiverTracker <=點睛之筆
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) <=點睛之筆
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
(主要從timeToAllocatedBlocks中取出數據,構建RDD,方便後續調用鏈使用generatedRDDs)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos) <=點睛之筆
}
}
Some(blockRDD)
}
複製代碼
可見 allocateBlocksToBatch的做用就是爲了把對應窗的Block放進timeToAllocatedBlocks。方便調用鏈使用。post
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
複製代碼
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
case None => None
}
}
複製代碼
// Map to track all the InputInfo related to specific batch time and input stream. private val batchTimeToInputInfos = new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]學習
case class StreamInputInfo(
inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty)
複製代碼
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
複製代碼
本文是做者花大量時間整理而成,請勿作伸手黨,禁止轉載,歡迎學習,有問題請留言。
秦凱新 於深圳 2018