本期內容:設計模式
1, ReceiverTracker的架構設計架構
2, 消息循環系統架構設計
3, ReceiverTracker具體實現設計
ReceiverTacker類以下,從源碼註釋能夠看出該類的做用。日誌
管理ReceiverInputDStreams的執行,記錄Receiver發來的元數據信息。ReceiverTacker類構造時必須傳入StreamingContext對象。對象
ReceiverTacker類內部有ReceiverTackerEndpoint這個消息通訊體,用於和運行在Executor端的ReceiverSupervisorImpl進行通訊,包括Receiver的註冊,重啓Receiver,清除以前的Block數據,更新限流值,添加Block元數據信息等消息。get
接下來以接收到來自Executor端的ReceiverSupervisorImpl發來添加元數據信息的AddBlock消息,進行講解具體的處理流程。源碼
ReceivedBlockInfo類包含了StreamID,Block中記錄條數,元數據Metadata,接收Block的存儲結果(BlockID和記錄數量)it
ReceiverBlockTracker類是addBlock方法的具體實現。table
1.調用ReceiverBlockTracker的writeToLog方法
2.調用ReceiverBlockTracker的getReceivedBlockQueue方法,其中streamIdToUnallocatedBlockQueues爲HashMap,Key爲StreamID,Value爲ReceivedBlockQueue。而ReceivedBlockQueue 的定義爲private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
ReceiverBlockTracker類,能夠從源碼中看出,他會記錄全部接收到的Block信息,根據須要把Block分配給Batch。若是設置了checkpoint,開啓WAL,則會把全部的操做保存到預寫日誌中,所以當Driver失敗後就能夠從checkpoint和WAL中恢復ReceiverTracker的狀態。
ReceiverBlockTracker類中重要的方法,allocateBlocksToBatch。private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]存儲批處理時刻,分配到的Blocks數據。
該方法是被ReceiverTracker調用的。
而ReceiverTracker的allocateBlocksToBatch方法是被JobGenerator的generateJobs方法調用的。
ReceiverBlockTracker類中重要的方法,getBlocksOfBatch。
該方法是被ReceiverTracker的getBlocksOfBatch調用。
ReceiverTracker的getBlocksOfBatch方法是被ReceiverInputDStream的compute方法調用的。
總結:
Receiver接收到數據,而後合併並存儲數據以後,ReceiverSupervisorImpl會把Block的元數據彙報給ReceiverTracker內部的消息通訊體ReceiverTrackerEndpoint。
ReceiverTracker接收到Block的元數據信息以後,由ReceivedBlockTracker管理Block的元數據的分配,JobGenerator會將每一個Batch,從ReceivedBlockTracker中獲取屬於該Batch的Block元數據信息來生成RDD。
從設計模式來說:ReceiverTrackerEndpoint和ReceivedBlockTracker是門面設計模式,內部實際幹事情的是ReceivedBlockTracker,外部通訊體或者表明者就是ReceiverTrackerEndpoint。