本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
下圖深度剖析了ReceiverTracker中如何實現 receiver RDD 的Job提交流程,右側黃色底面爲本節重點要講的ReceiverSupervisorImpl。實現了receiver的啓動,以及Block的生成等過程。緩存
下圖能夠清晰的看到supervisor做爲父類,在StartReceiverFunc,啓動了兩個start函數:架構
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
複製代碼
(1)第一個啓動了 ReceiverSupervisorImpl的 onStart()方法,從而啓動了registeredBlockGenerators,開啓了數據batch的生成和管理。框架
override protected def onStart() {
registeredBlockGenerators.asScala.foreach { _.start() }
}
複製代碼
(2)第二個 startReceiver,先調用startReceiver,進一步會調用ReceiverSupervisorImpl的onReceiverStart方法來判斷是否成功註冊到ReceiverTracker中,若成功則會啓動receiversocket
supervisor的startReceiver方法
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) { <=神來之筆(端點通信註冊Receiver)
logInfo(s"Starting receiver $streamId")
receiverState = Started
receiver.onStart() <=神來之筆
logInfo(s"Called receiver $streamId onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
ReceiverSupervisorImpl的onReceiverStart方法
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askSync[Boolean](msg)
}
複製代碼
(3)ReceiverTracker的receiver註冊請求管理ide
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
val successful =
registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress) <=神來之筆 (eceiverTrackingInfos的管理)
context.reply(successful)
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
context.reply(addBlock(receivedBlockInfo))
} else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
}
}
})
} else {
context.reply(addBlock(receivedBlockInfo))
}
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
context.reply(true)
// Local messages
case AllReceiverIds =>
context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
case GetAllReceiverInfo =>
context.reply(receiverTrackingInfos.toMap)
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
context.reply(true)
}
複製代碼
(4)registerReceiver中如何實現receiverTrackingInfos的管理函數
/** Register a receiver */
private def registerReceiver(
streamId: Int,
typ: String,
host: String,
executorId: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
): Boolean = {
if (!receiverInputStreamIds.contains(streamId)) {
throw new SparkException("Register received for unexpected id " + streamId)
}
if (isTrackerStopping || isTrackerStopped) {
return false
}
val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
val acceptableExecutors = if (scheduledLocations.nonEmpty) {
// This receiver is registering and it's scheduled by
// ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
scheduledLocations.get
} else {
// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
scheduleReceiver(streamId)
}
def isAcceptable: Boolean = acceptableExecutors.exists {
case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
case loc: TaskLocation => loc.host == host
}
if (!isAcceptable) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
val name = s"${typ}-${streamId}"
val receiverTrackingInfo = ReceiverTrackingInfo(
streamId,
ReceiverState.ACTIVE,
scheduledLocations = None,
runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
name = Some(name),
endpoint = Some(receiverEndpoint))
receiverTrackingInfos.put(streamId, receiverTrackingInfo) <=神來之筆
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
true
}
}
複製代碼
依賴於ReceiverSupervisoroop
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next()) <= 神來之筆
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
onStop()
}
}
複製代碼
依賴於ReceiverSupervisor的pushSingle方法post
* Store a single item of received data to Spark's memory.
* These single items will be aggregated together into data blocks before
* being pushed into Spark's memory.
def store(dataItem: T) {
supervisor.pushSingle(dataItem) <= 神來之筆
}
複製代碼
依賴於ReceiverSupervisor的內部的defaultBlockGenerator學習
/* Push a single record of received data into block generator.
def pushSingle(data: Any) {
defaultBlockGenerator.addData(data) <= 神來之筆
}
複製代碼
BlockGenerator的仗劍走天涯,詩酒趁年華。兩大線程解決block存儲和管理問題:
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
一大線程:
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
二大線程:
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
複製代碼
倆兄弟上吧
def start(): Unit = synchronized {
if (state == Initialized) {
state = Active
blockIntervalTimer.start() <= 神來之筆
blockPushingThread.start() <= 神來之筆
logInfo("Started BlockGenerator")
} else {
throw new SparkException(
s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
}
}
複製代碼
積水緩衝缸(存儲離散的數據流)(currentBuffer)
@volatile private var currentBuffer = new ArrayBuffer[Any]
桶裝水(積水緩衝缸的水彙集成桶)(blocksForPushing)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
複製代碼
積水緩衝缸經過InputDStream蓄水
* Push a single data item into the buffer.
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
複製代碼
blockIntervalTimer把積水緩衝缸轉換爲桶裝水,管理起來
/** Change the buffer to which single records are added to. */
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs) <= 神來之筆
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer) <= 神來之筆
}
}
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full <= 神來之筆
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
複製代碼
keepPushingBlocks 看我攪動風雲
咱們發現看了一場ReceiverSupervisorImpl的世紀大戲,自導自演解決了端到端的問題。
秦凱新 於深圳 2018