Spark Streaming的內核機制。後臺線程負責接收源數據的流,收到的數據保存到BlockManager,並按照時間間隔封裝成批量RDD,在RDD上執行各類操做。數組
每種數據源提供本身的Receiver的實現,將接收的數據調用store方法保存下來,store方法調用ReceiverSupervisorImpl來保存真正的數據,ReceiverSupervisorImpl調用pushSingle等方法將數據添加到BlockGenerator。BlockGenerator啓動RecurringTimer定時器保證按照程序設置的時間間隔觸發spark streaming的任務執行,從BlockGenerator存儲中取出已經接收到的全部數據並構造RDD數組。同時在這一系列過程當中經過BlockGeneratorListener傳遞消息。spa
每種子實時數據源須要實現本身的DStream的子類。線程
在reveiver/ BlockGenerator.scala中。負責接收DStream的數據並保存到blocksForPushing中。對於每種須要接收的實時流,都會啓動一個BlockGenerator負責在後臺默默的接收數據並保存到BlockManager中,同時產生Block並添加到blocksForPushing隊列中。scala
BlockGernerator的默認的時間間隔是200ms,能夠設置。定時器的代碼以下:隊列
/** Change the buffer to which single records are added to. */it
入參time是period的整數倍,time-period就是實時流RDD批次的序號。spark
private def updateCurrentBuffer(time: Long): Unit = {io
try {thread
var newBlock: Block = null後臺
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
}
}
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
不少Receiver的基類,利用compute方法生成RDD[T],從receiverTracker中根據validTime獲取當前batch時間的數據,通常存儲在BlockManager中,包裝成BlockRDD[T]。