此文是從思惟導圖中導出稍做調整後生成的,思惟腦圖對代碼瀏覽支持不是很好,爲了更好閱讀體驗,文中涉及到的源碼都是刪除掉沒必要要的代碼後的僞代碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀:node
此博文共分爲四個部分:git
數據的產生與導入主要分爲如下五個部分github
由 Receiver 的總指揮 ReceiverTracker 分發多個 job(每一個 job 有 1 個 task),到多個 executor 上分別啓動 ReceiverSupervisor 實例數組
從ReceiverInputDStreams中獲取Receivers,並把他們發送到全部的worker nodes:緩存
class ReceiverTracker { var endpoint:RpcEndpointRef= private def **launchReceivers**(){ // DStreamGraph的屬性inputStreams val receivers=inputStreams.map{nis=> val rcvr=nis.getReceiver() // rcvr是對kafka,socket等接受數據的定義 rcvr } // 發送到worker endpoint.send(StartAllReceivers(receivers)) } }
目的地選擇分兩種狀況:初始化選擇和失敗重啓選擇框架
class ReceiverTracker { // 分發目的地的計算 val schedulingPolicy= new ReceiverSchedulingPolicy() def receive{ // 首次啓動 case StartAllReceivers(receivers) => ... // 失敗重啓 case RestartReceiver(receiver)=> ... } }
1. 選擇最優executors位置異步
2. 遍歷構造最終分發的excutorsocket
class ReceiverTracker { val schedulingPolicy= new ReceiverSchedulingPolicy() def receive{ // 首次啓動 case StartAllReceivers(receivers) => // 1. 選擇最優executors位置 val locations= schedulingPolicy.scheduleReceivers( receivers,getExecutors ) // 2. 遍歷構造最終分發的excutor for(receiver<- receivers){ val executors = scheduledLocations( receiver.streamId) startReceiver(receiver, executors) } // 失敗重啓 case RestartReceiver(receiver)=> ... } }
1.獲取以前的executorsoop
2. 計算新的excutor位置大數據
2.1 以前excutors可用,則使用以前的
2.2 以前的不可用則從新計算位置
3. 發送給worker重啓receiver
class ReceiverTracker { val schedulingPolicy= new ReceiverSchedulingPolicy() def receive{ // 首次啓動 case StartAllReceivers(receivers) => ... // 失敗重啓 case RestartReceiver(receiver)=> // 1.獲取以前的executors val oldScheduledExecutors =getStoredScheduledExecutors( receiver.streamId ) // 2. 計算新的excutor位置 val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // 2.1 以前excutors可用,則使用以前的 oldScheduledExecutors } else { // 2.2 以前的不可用則從新計算位置 schedulingPolicy.rescheduleReceiver() // 3. 發送給worker重啓receiver startReceiver( receiver, scheduledLocations) } }
策略選擇由ReceiverSchedulingPolicy實現,默認策略是輪訓(round-robin),在1.5版本以前是使用依賴 Spark Core 的 TaskScheduler 進行通用分發,
在1.5以前存在executor分發不均衡問題致使Job執行失敗:
若是某個 Task 失敗超過 spark.task.maxFailures(默認=4) 次的話,整個 Job 就會失敗。這個在長時運行的 Spark Streaming 程序裏,Executor 多失效幾回就有可能致使 Task 失敗達到上限次數了,若是某個 Task 失效一下,Spark Core 的 TaskScheduler 會將其從新部署到另外一個 executor 上去重跑。但這裏的問題在於,負責重跑的 executor 多是在下發重跑的那一刻是正在執行 Task 數較少的,但不必定可以將 Receiver 分佈的最均衡的。
策略代碼:
val scheduledLocations =ReceiverSchedulingPolicy.scheduleReceivers(receivers,xecutors) val scheduledLocations =ReceiverSchedulingPolicy.rescheduleReceiver(receiver, ...)
將receiver列表轉換爲RDD
class ReceiverTracker { def receive{ ... startReceiver(receiver, executors) } def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]){ } } class ReceiverTracker { def startReceiver( ... val receiverRDD: RDD[Receiver] = if (scheduledLocations.isEmpty) { **ssc.sc.makeRDD(Seq(receiver), 1)** } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s" $receiverId") ... } }
將每一個receiver,spark環境變量,hadoop配置文件,檢查點路徑等信息傳送給excutor的接收對象ReceiverSupervisorImpl
class ReceiverTracker { def startReceiver( ... val startReceiverFunc: Iterator[Receiver[_]]=>Unit= (iterator:Iterator)=>{ val receiver=iterator.next() val supervisor= new ReceiverSupervisoImpl( receiver, SparkEnv, HadoopConf, checkpointDir, ) supervisor.start(), supervisor.awaitTermination() } ... }
將前兩部定義的rdd和fun從driver提交到excutor
class ReceiverTracker { def startReceiver( ... val future=ssc.sparkContext.submitJob( receiverRDD, startReceverFunc, ) ... } }
Executor的啓動在Receiver類中定義,在ReceiverSupervisor類中調用,在Receiver的子類中實現
excutor中共須要啓動兩個線程
-1. 啓動Receiver接收數據
- 2. 啓動pushingThread定時推送數據到driver
class ReceiverSupervisor( receiver: Receiver, conf: sparkConf ){ def start() { onStart() startReceiver() } }
啓動Receiver,開始接收數據
class ReceiverSupervisor( receiver: Receiver, conf: sparkConf ){ def start() { onStart() startReceiver() } // 1. 啓動Receiver,開始接收數據 def startReceiver(){ receiverState=Started receiver.onStart() } }
啓動pushTread,定時推送信息到driver
class ReceiverSupervisor( receiver: Receiver, conf: sparkConf ){ def start() { onStart() startReceiver() } // 1. 啓動Receiver,開始接收數據 def startReceiver(){ receiverState=Started receiver.onStart() } } // 2. 啓動pushTread,定時推送信息到driver def onStart() { registeredBlockGenerators.asScala.foreach { _.start() } } } // _.start() 的實現 class BlockGenerator{ def start(){ blockIntervalTimer.start() blockPushingThread.start() } }
啓動 Receiver 實例,並一直 block 住當前線程
在1.5版本以前,一個job包含多個task,一個task失敗次數失敗超過4次後,整個Job都會失敗,1.5版本以後一個job只包含一個task,而且添加了可重試機制,大大增長了job的活性
Spark Core 的 Task 下發時只會參考並大部分時候尊重 Spark Streaming 設置的 preferredLocation 目的地信息,仍是有必定可能該分發 Receiver 的 Job 並無在咱們想要調度的 executor 上運行。此時,在第 1 次執行 Task 時,會首先向 ReceiverTracker 發送 RegisterReceiver 消息,只有獲得確定的答覆時,才真正啓動 Receiver,不然就繼續作一個空操做,致使本 Job 的狀態是成功執行已完成。固然,ReceiverTracker 也會另外調起一個 Job,來繼續嘗試 Receiver 分發……如此直到成功爲止。
一個 Receiver 的分發 Job 是有可能沒有完成分發 Receiver 的目的的,因此 ReceiverTracker 會繼續再起一個 Job 來嘗試 Receiver 分發。這個機制保證了,若是一次 Receiver 若是沒有抵達預先計算好的 executor,就有機會再次進行分發,從而實如今 Spark Streaming 層面對 Receiver 所在位置更好的控制。
對 Receiver 的監控重啓機制
上面分析了每一個 Receiver 都有專門的 Job 來保證分發後,咱們發現這樣一來,Receiver 的失效重啓就不受 spark.task.maxFailures(默認=4) 次的限制了。
由於如今的 Receiver 重試不是在 Task 級別,而是在 Job 級別;而且 Receiver 失效後並不會致使前一次 Job 失敗,而是前一次 Job 成功、並新起一個 Job 再次進行分發。這樣一來,無論 Spark Streaming 運行多長時間,Receiver 老是保持活性的,不會隨着 executor 的丟失而致使 Receiver 死去。
// todo 阻塞,知道executor返回發送結果
class ReceiverTracker { def startReceiver( ... future.onComplete { case Success(_)=> ... case Failure())=> onReceiverJobFinish(receiverId) ... }}(ThreadUtils.sameThread) ... }
每一個 ReceiverSupervisor 啓動後將立刻生成一個用戶提供的 Receiver 實現的實例 —— 該 Receiver 實現能夠持續產生或者持續接收系統外數據,好比 TwitterReceiver 能夠實時爬取 twitter 數據 —— 並在 Receiver 實例生成後調用 Receiver.onStart()。
數據的接收由Executor端的Receiver實現,啓動和中止須要子類實現,存儲基類實現,供子類調用
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { // 啓動和中止須要子類實現 def onStart() def onStop() // 【存儲單條小數據】 def store(dataItem: T) {...} // 【存儲數組形式的塊數據】 def store(dataBuffer: ArrayBuffer[T]) {...} // 【存儲 iterator 形式的塊數據】 def store(dataIterator: Iterator[T]) {...} // 【存儲 ByteBuffer 形式的塊數據】 def store(bytes: ByteBuffer) {...} ... }
經過kafka去接收數據,
class KafkaInputDStream **extends Receiver**( _ssc : StreamingContext, kafkaParams : Map[String,String], topics : Map[String,Int], useReliableReceiver : Boolean storageLevel : StorageLevel ){ def onStart(){ } }
拼接kafka consumer所需參數
class KafkaInputDStream(){ def onStart(){ **// 1. 獲取kafka參數** val props=new Properties() kafkaParams.foreach( p=>props.put(p._1,p._2) ) } }
class KafkaInputDStream(){ // kafka連接器 var consumerConnector:ConsumerConnector def onStart(){ // 1. 獲取kafka參數 val props=new Properties() kafkaParams.foreach( p=>props.put(p._1,p._2) ) // 2. 連接到kafka val consumerConf= new ConsumerConfig(props) consumerConnector= Consumer.create(consumerConf) } }
class KafkaInputDStream(){ // kafka連接器 var consumerConnector:ConsumerConnector def onStart(){ // 1. 獲取kafka參數 val props=new Properties() kafkaParams.foreach( p=>props.put(p._1,p._2) ) // 2. 連接到kafka val consumerConf= new ConsumerConfig(props) consumerConnector= Consumer.create(consumerConf) // 3. 監聽全部topic val topicMessageStreams= consumerConnector.createMessage() val executorPool=ThreadUtils. newDaemonFixedTreadPool( topics.values.sum, "kafkaMessageHandler" ) topicMessageStreams.values.foreach( streams=>streams.foreach{ stream=> executorPool.submit( new MessageHandler(stream) ) } ) } }
class KafkaInputDStream(){ // kafka連接器 var consumerConnector:ConsumerConnector def onStart(){ // 1. 獲取kafka參數 val props=new Properties() kafkaParams.foreach( p=>props.put(p._1,p._2) ) // 2. 連接到kafka val consumerConf= new ConsumerConfig(props) consumerConnector= Consumer.create(consumerConf) // 3. 監聽全部topic val topicMessageStreams= consumerConnector.createMessage() val executorPool=ThreadUtils. newDaemonFixedTreadPool( topics.values.sum, "kafkaMessageHandler" ) topicMessageStreams.values.foreach( streams=>streams.foreach{ stream=> executorPool.submit( new MessageHandler(stream) ) } ) } // 4. 異步保存數據 class MessageHandler( stream:KafkaStream[K,V]) extends Runable{ def run{ val streamIterator=stream.iterator() while(streamIterator.hasNext()){ val msgAndMetadata= streamIterator.next() **store(** **msgAndMetadata.key,** **msgAndMetadata.message** **)** } } } } }
自定義的Receiver只須要繼承Receiver類,並實現onStart方法裏新拉起數據接收線程,並在接收到數據時 store() 到 Spark Streamimg 框架就能夠了。
Receiver 在 onStart() 啓動後,就將持續不斷地接收外界數據,並持續交給 ReceiverSupervisor 進行數據轉儲
Receiver在調用store方法後,根據不一樣的入參會調用ReceiverSupervisor的不一樣方法。ReceiverSupervisor的方法由ReceiverSupervisorImpl實現
class Receiver { var supervisor:ReceiverSupervisor; // 1.單條數據 def strore(dataItem: T ){ supervisor.pushSigle(dataItem) } // 2. byte數組 def store(bytes : ByteBuffer){ supervisor.pushBytes(bytes,None,None) } // 3. 迭代器格式 def store(dataIterator : Iterator[T]){ supervisor.pusthIteratro(dataIterator) } // 4. ByteBuffer格式 def store(dataBuffer:ArrayBuffer[T]){ supervisor.pushArrayBuffer(dataBuffer) } }
調用ReceiverSupervisorImpl的pushSigle方法保存數據
class ReceiverSupervisorImpl { val defaultBlockGenerator= new BlockGenerator( blockGeneratorListener, streamId, env.conf ) def pushSinge(data:Any){ defaultBlockGenerator.addData(data) } }
先檢查接收數據的頻率,控制住頻率就控制了每一個batch須要處理的最大數據量
就是在加入 currentBuffer 數組時會先由 rateLimiter 檢查一下速率,是否加入的頻率已經過高。若是過高的話,就須要 block 住,等到下一秒再開始添加。這裏的最高頻率是由 spark.streaming.receiver.maxRate (default = Long.MaxValue) 控制的,是單個 Receiver 每秒鐘容許添加的條數。控制了這個速率,就控制了整個 Spark Streaming 系統每一個 batch 須要處理的最大數據量。
class BlockGenerator{ def addData(data:Any)={ // 1. 檢查接收頻率 waitToPush() } } class RateLimiter(conf:SparkConf){ val maxRateLimit= conf.getLong( "spark.streaming.receiver.maxRate", Long.MaxValue ) val rateLimiter=GuavaRateLimiter.create( maxRateLimit.toDouble ) def waitToPush(){ rateLimiter.acquire() } }
若是頻率正常,則把數據添加到數組中,不然拋異常
class BlockGenerator{ var currentBuffer=new ArrayBuffer[Any] def addData(data:Any)={ // 1. 檢查接收頻率 waitToPush() // 2. 添加數據到currentBuffer synchronized{ if(state==Active){ currentBuffer+=data }else{ throw new SparkException{ "connot add data ..." } } } } }
3.1 清空currentBuffer
3.2 將block塊放入blocakQueue
class BlockGenerator{ var currentBuffer=new ArrayBuffer[Any] // 定時器:定時更新currentBuffer val blockIntervalTimer= new RecurringTimer( clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator" ) // 保存block的數組大小,默認是10 val queueSize=conf.getInt( "spark.streaming.blockQueueSize",10) val blocksForPushing= new ArrayBlockingQueue[Block](queueSize) def addData(data:Any)={ // 1. 檢查接收頻率 waitToPush() // 2. 添加數據到currentBuffer synchronized{ currentBuffer+=data } def updateCurrentBuffer(timer:Long){ var newBlock:Block=null synchronized{ // 3.1 清空currentBuffer val newBlockBuffer=currentBuffer currentBuffer=new ArrayBuffer[Any] // 3. 2 將block塊放入blocakQueue newBlock= new Block(id,newBlockBuffer) blocksForPushing.put(newBlock) } } } }
在初始化BlockGenerator時,啓動一個線程去持續的執行pushBlocks方法。若是尚未生成blocks,則阻塞調用queue.poll去獲取數據,若是已經存在blocks塊,則直接queue.take(10)
class BlockGenerator{ var currentBuffer=new ArrayBuffer[Any] // 定時器:定時更新currentBuffer val blockIntervalTimer= new RecurringTimer( clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator" ) // 保存block的數組大小,默認是10 val queueSize=conf.getInt( "spark.streaming.blockQueueSize",10) val blocksForPushing= new ArrayBlockingQueue[Block](queueSize) // 推送block塊 val blockPushingThread=new Thread(){ def run(){keepPushingBlocks()} } def addData(data:Any)={ // 1. 檢查接收頻率 waitToPush() // 2. 添加數據到currentBuffer synchronized{ currentBuffer+=data } def updateCurrentBuffer(timer:Long){ var newBlock:Block=null synchronized{ // 3.1 清空currentBuffer val newBlockBuffer=currentBuffer currentBuffer=new ArrayBuffer[Any] // 3. 2 將block塊放入blocakQueue newBlock= new Block(id,newBlockBuffer) blocksForPushing.put(newBlock) } } def keepPushingBlocks(){ // **4.1 當block正在產時,等待其生成** while(areBlocksBeingGenerated){ Option(blocksForPushing.poll( waitingTime ) match{ case Some(block)=> pushBLock(block) case None => }) } // 4.2 block塊已經生成 while(!blocksForPushing.isEmpty){ val block=blocksForPushing.take() pushBlock(block) } } } }
class ReceiverSupervisorImpl { def pushAndReportBlock { val blockStoreResult = **receivedBlockHandler.storeBlock**( blockId, receivedBlock ) } }
class ReceiverSupervisorImpl { def pushAndReportBlock { val blockStoreResult = receivedBlockHandler.**storeBlock**( blockId, receivedBlock ) val blockInfo = ReceivedBlockInfo( streamId, numRecords, metadataOption, blockStoreResult ) trackerEndpoint.askSync[Boolean](AddBlock(blockInfo)) } }
class ReceiverSupervisorImpl{ def pushBytes( bytes: ByteBuffer, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock( ByteBufferBlock(bytes), metadataOption, blockIdOption ) } }
class ReceiverSupervisorImpl{ def pushIterator( iterator: Iterator[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption) } }
class ReceiverSupervisorImpl{ def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) } }
ReceivedBlockHandler 有兩個具體的存儲策略的實現:
(a) BlockManagerBasedBlockHandler,是直接存到 executor 的內存或硬盤
(b) WriteAheadLogBasedBlockHandler,是先寫 WAL,再存儲到 executor 的內存或硬盤
將數據存儲交給blockManager進行管理,調用blockmanager的putIterator方法,由其實如今不一樣excutor上的複製以及緩存策略。
class BlockManagerBasedBlockHandler( blockManager:BlockManager, storageLevel:StorageLevel )extends ReceivedBlockHandler{ def storeBlock(blockId,block){ var numRecords:Option[Long]=None val putSucceeded:Boolean = block match{ case ArrayBufferBlock(arrayBuffer)=> numRecords=Some(arrayBuffer.size) blockManager.putIterator( blockId, arrayBuffer.iterator, storageLevel, tellMaster=true ) case IteratorBlock(iterator)=> val countIterator= new CountingIterator(iterator) val putResult= **blockManager.putIterato**r( blockId, arrayBuffer.iterator, storageLevel, tellMaster=true ) numRecords=countIterator.count putResult case ByteBufferBlock(byteBuffer)=> blockManager.putBytes( blockId, new ChunkedBytedBuffer( byteBuffer.duplicate(), storageLevel, tellMaster=true ) ) // 報告給driver的信息:id和num BlockManagerBasedStoreResult( blockId, numRecords ) } } } // ChunkedBytedBuffer: 將byte數組分片 // byteBuffer.duplicate(): 複製
WriteAheadLogBasedBlockHandler 的實現則是同時寫到可靠存儲的 WAL 中和 executor 的 BlockManager 中;在二者都寫完成後,再上報塊數據的 meta 信息。
BlockManager 中的塊數據是計算時首選使用的,只有在 executor 失效時,纔去 WAL 中讀取寫入過的數據。
同其它系統的 WAL 同樣,數據是徹底順序地寫入 WAL 的;在稍後上報塊數據的 meta 信息,就額外包含了塊數據所在的 WAL 的路徑,及在 WAL 文件內的偏移地址和長度。
class WriteAheadLogBasedBlockHandler( blockManager: BlockManager, serializerManager: SerializerManager, streamId: Int, storageLevel: StorageLevel, conf: SparkConf, hadoopConf: Configuration, checkpointDir: String, clock: Clock = new SystemClock )extends ReceivedBlockHandler{ // 保存超時時間 blockStoreTimeout = conf.getInt( "spark.streaming.receiver. blockStoreTimeout",30).seconds // 寫log類 val writeAheadLog=WriteAheadLogUtils. creatLogForReceiver( conf, checkpointDirToLogDir( checkpointDir, streamId, hadoopConf ) ) def storeBlock(){ // 1. 執行blockManager val serializedBlock = block match {...} // 2. 執行保存到log // 用future異步執行 val storeInBlockManagerFuture=Future{ blockManger.putBytes(...serializedBlock) } val storeInWriteAheadLogFuture=Future{ writeAheadLog.write(...serializedBlock) } val combineFuture= storeInBlockManagerFuture.zip( storeInWriteAHeadLogFuture ).map(_._2) val walRecordHandle=ThreadUtils. awaitUtils.awaitResult( combineFuture,blockStoreTimeout ) WriteAheandLogBasedStoreResult( blockId, numRecords, walRecordHandle ) } } // future1.zip(future2): 合併future,返回tuple(future) // 兩個future中有一個失敗,則失敗
每次成塊在 executor 存儲完畢後,ReceiverSupervisor 就會及時上報塊數據的 meta 信息給 driver 端的 ReceiverTracker;這裏的 meta 信息包括數據的標識 id,數據的位置,數據的條數,數據的大小等信息
ReceiverSupervisor會將數據的標識ID,數據的位置,數據的條數,數據的大小等信息上報給driver
class ReceiverSupervisorImpl { def pushAndReportBlock { val blockStoreResult = receivedBlockHandler.storeBlock( blockId, receivedBlock ) val blockInfo = ReceivedBlockInfo( **streamId,** **numRecords,** **metadataOption,** **blockStoreResult** ) trackerEndpoint.askSync[Boolean](AddBlock(blockInfo)) } }
// 報告給driver的信息:blockId,block數量,walRecordHandle
WriteAheandLogBasedStoreResult( blockId, numRecords, **walRecordHandle** )
// 報告給driver的信息:id和num
BlockManagerBasedStoreResult( blockId, numRecords )
一方面 Receiver 將經過 AddBlock 消息上報 meta 信息給 ReceiverTracker,另外一方面 JobGenerator 將在每一個 batch 開始時要求 ReceiverTracker 將已上報的塊信息進行 batch 劃分,ReceiverTracker 完成了塊數據的 meta 信息管理工做。
具體的,ReceiverTracker 有一個成員 ReceivedBlockTracker,專門負責已上報的塊數據 meta 信息管理。
在 ssc.start() 時,將隱含地調用 ReceiverTracker.start();而 ReceiverTracker.start() 最重要的任務就是調用本身的 launchReceivers() 方法將 Receiver 分發到多個 executor 上去。而後在每一個 executor 上,由 ReceiverSupervisor 來分別啓動一個 Receiver 接收數據
並且在 1.5.0 版本以來引入了 ReceiverSchedulingPolicy,是在 Spark Streaming 層面添加對 Receiver 的分發目的地的計算,相對於以前版本依賴 Spark Core 的 TaskScheduler 進行通用分發,新的 ReceiverSchedulingPolicy 會對 Streaming 應用的更好的語義理解,也能計算出更好的分發策略。
而且還經過每一個 Receiver 對應 1 個 Job 的方式,保證了 Receiver 的屢次分發,和失效後的重啓、永活
ReceiverTracker:
RpcEndPoint 能夠理解爲 RPC 的 server 端,底層由netty提供通訊支持,供 client 調用。
ReceiverTracker 做爲 RpcEndPoint 的地址 —— 即 driver 的地址 —— 是公開的,可供 Receiver 鏈接;若是某個 Receiver 鏈接成功,那麼 ReceiverTracker 也就持有了這個 Receiver 的 RpcEndPoint。這樣一來,經過發送消息,就能夠實現雙向通訊。
只接收消息不回覆,除了錯誤上報消息是excutor發送的之外,其他都是driver的tracker本身給本身發送的命令,接收消息均在ReceiverTracker.receive方法中實現
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... case RestartReceiver => ... case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... case ReportError => ... } }
在 ReceiverTracker 剛啓動時,發給本身這個消息,觸發具體的 schedulingPolicy 計算,和後續分發
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => val scheduledLocations = schedulingPolicy. scheduleReceivers( receivers, getExecutors ) for (receiver <- receivers) { val executors = scheduledLocations( receiver. streamId ) updateReceiverScheduledExecutors( receiver. streamId, executors ) receiverPreferredLocations( receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) } case RestartReceiver => ... case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... case ReportError => ... } }
當初始分發的 executor 不對,或者 Receiver 失效等狀況出現,發給本身這個消息,觸發 Receiver 從新分發
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... // 失敗重啓 case RestartReceiver(receiver)=> // 1.獲取以前的executors val oldScheduledExecutors = getStoredScheduledExecutors( receiver.streamId ) // 2. 計算新的excutor位置 val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // 2.1 以前excutors可用,則使用以前的 oldScheduledExecutors } else { // 2.2 以前的不可用則從新計算位置 schedulingPolicy.rescheduleReceiver() // 3. 發送給worker重啓receiver startReceiver( receiver, scheduledLocations) case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... case ReportError => ... } }
當塊數據已完成計算再也不須要時,發給本身這個消息,將給全部的 Receiver 轉發此 CleanupOldBlocks 消息
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... case RestartReceiver => ... case CleanupOldBlocks => receiverTrackingInfos.values.flatMap( _.endpoint ).foreach( _.send(c) ) case UpdateReceiverRateLimit => ... case ReportError => ... } }
ReceiverTracker 動態計算出某個 Receiver 新的 rate limit,將給具體的 Receiver 發送 UpdateRateLimit 消息
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... case RestartReceiver => ... case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) } case ReportError => ... } }
class ReceiverTracker { def receive:PartialFunction[Any,Unit]={ case StartAllReceivers => ... case RestartReceiver => ... case CleanupOldBlocks => ... case UpdateReceiverRateLimit => ... case ReportError => reportError(streamId, message, error) } }
接收executor的消息,處理完畢後並回復給executor
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => ... case AllReceiverIds => ... case StopAllReceivers => ... } }
由 Receiver 在試圖啓動的過程當中發來,將回復容許啓動,或不容許啓動
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => val successful=registerReceiver( streamId, type, host, executorId, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock() => ... case DeregisterReceiver() => ... case AllReceiverIds => ... case GetAllReceiverInfo => ... case StopAllReceivers => ... } }
具體的塊數據 meta 上報消息,由 Receiver 發來,將返回成功或失敗
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => context.reply( addBlock(receivedBlockInfo) ) case DeregisterReceiver() => ... case AllReceiverIds => ... case GetAllReceiverInfo => ... case StopAllReceivers => ... } }
executor發送的本地消息。在 ReceiverTracker stop() 的過程當中,查詢是否還有活躍的 Receiver,返回全部或者的receiverId
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => ... case AllReceiverIds => context.reply( receiverTrackingInfos.filter( _._2.state != ReceiverState.INACTIVE ).keys.toSeq ) case GetAllReceiverInfo => ... case StopAllReceivers => ... } }
查詢全部excutors的信息給receiver
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => case AllReceiverIds => ... case GetAllReceiverInfo => context.reply( receiverTrackingInfos.toMap ) case StopAllReceivers => ... } }
在 ReceiverTracker stop() 的過程剛開始時,要求 stop 全部的 Receiver;將向全部的 Receiver 發送 stop 信息,並返回true
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => ... case AllReceiverIds => ... case GetAllReceiverInfo => ... case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) receiverTrackingInfos.values.flatMap( _.endpoint ).foreach { _.send(StopReceiver) } context.reply(true) } }
由 Receiver 發來,中止receiver,處理後,不管如何都返回 true
class ReceiverTracker { def receiveAndReply(context:RpcCallContext){ case RegisterReceiver() => ... case AddBlock() => ... case DeregisterReceiver() => deregisterReceiver( streamId, message, error ) context.reply(true) case AllReceiverIds => ... case GetAllReceiverInfo => ... case StopAllReceivers => ... } }
addBlock(receivedBlockInfo: ReceivedBlockInfo)方法接收到某個 Receiver 上報上來的塊數據 meta 信息,將其加入到 streamIdToUnallocatedBlockQueues 裏
class ReceivedBlockTracker{ // 上報上來的、但還沒有分配入 batch 的 Block 塊數據的 meta val streamIdToUnallocatedBlockQueues = new HashMap[Int, ReceivedBlockQueue] // WAL val writeResult= writeToLog( BlockAdditionEvent( receivedBlockInfo ) ) if(writeResult){ synchronized{ streamIdToUnallocatedBlockQueues. getOrElseUpdate( streamId, new ReceivedBlockQueue() )+= receivedBlockInfo } } }
JobGenerator 在發起新 batch 的計算時,將 streamIdToUnallocatedBlockQueues 的內容,以傳入的 batchTime 參數爲 key,添加到 timeToAllocatedBlocks 裏,並更新 lastAllocatedBatchTime
class ReceivedBlockTracker{ // 上報上來的、已分配入 batch 的 Block 塊數據的 meta,按照 batch 進行一級索引、再按照 receiverId 進行二級索引的 queue,因此是一個 HashMap: time → HashMap val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks:Map[ Int, Seq[ReceivedBlockInfo] ] ] // 記錄了最近一個分配完成的 batch 是哪一個 var lastAllocatedBatchTime: Time = null // 收集全部未分配的blocks def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { // 判斷時間是否合法:大於最近收集的時間 if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { // 從未分配隊列中取出blocks val streamIdToBlocks = streamIds.map { streamId =>(streamId,getReceivedBlockQueue(streamId) .dequeueAll(x => true)) }.toMap val allocatedBlocks =AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { // 放入已分配隊列 timeToAllocatedBlocks.put( batchTime, allocatedBlocks) // 更新最近分配的時間戳 lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } }
JobGenerator 在發起新 batch 的計算時,由 DStreamGraph 生成 RDD DAG 實例時,調用getBlocksOfBatch(batchTime: Time)查 timeToAllocatedBlocks,得到劃入本 batch 的塊數據元信息,由今生成處理對應塊數據的 RDD
class ReceivedBlockTracker{ def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized { timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty) } }
當一個 batch 已經計算完成、能夠把已追蹤的塊數據的 meta 信息清理掉時調用,將經過job清理 timeToAllocatedBlocks 表裏對應 cleanupThreshTime 以前的全部 batch 塊數據 meta 信息
class ReceivedBlockTracker{ def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { val timesToCleanup = timeToAllocatedBlocks.keys. filter { _ < cleanupThreshTime }.toSeq} if (writeToLog( BatchCleanupEvent(timesToCleanup))) { // 清除已分配batch隊列 timeToAllocatedBlocks --= timesToCleanup // 清除WAL writeAheadLogOption.foreach( _.clean( cleanupThreshTime.milliseconds, waitForCompletion) ) } }
腦圖製做參考:https://github.com/lw-lin/CoolplaySpark
完整腦圖連接地址:https://sustblog.oss-cn-beijing.aliyuncs.com/blog/2018/spark/srccode/spark-streaming-all.png