第11課:Spark Streaming源碼解讀之Driver中的ReceiverTracker架構

本期內容:設計模式

1, ReceiverTracker的架構設計架構

2, 消息循環系統架構設計

3, ReceiverTracker具體實現設計

ReceiverTacker類以下,從源碼註釋能夠看出該類的做用。日誌

    管理ReceiverInputDStreams的執行,記錄Receiver發來的元數據信息。ReceiverTacker類構造時必須傳入StreamingContext對象。對象

    ReceiverTacker類內部有ReceiverTackerEndpoint這個消息通訊體,用於和運行在Executor端的ReceiverSupervisorImpl進行通訊,包括Receiver的註冊,重啓Receiver,清除以前的Block數據,更新限流值,添加Block元數據信息等消息。get

    接下來以接收到來自Executor端的ReceiverSupervisorImpl發來添加元數據信息的AddBlock消息,進行講解具體的處理流程。源碼

        

ReceivedBlockInfo類包含了StreamID,Block中記錄條數,元數據Metadata,接收Block的存儲結果(BlockID和記錄數量)it

        

ReceiverBlockTracker類是addBlock方法的具體實現。table

        1.調用ReceiverBlockTracker的writeToLog方法

        2.調用ReceiverBlockTracker的getReceivedBlockQueue方法,其中streamIdToUnallocatedBlockQueues爲HashMap,Key爲StreamID,Value爲ReceivedBlockQueue。而ReceivedBlockQueue 的定義爲private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

        ReceiverBlockTracker類,能夠從源碼中看出,他會記錄全部接收到的Block信息,根據須要把Block分配給Batch。若是設置了checkpoint,開啓WAL,則會把全部的操做保存到預寫日誌中,所以當Driver失敗後就能夠從checkpoint和WAL中恢復ReceiverTracker的狀態。

        ReceiverBlockTracker類中重要的方法,allocateBlocksToBatch。private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]存儲批處理時刻,分配到的Blocks數據。

        該方法是被ReceiverTracker調用的。

        而ReceiverTracker的allocateBlocksToBatch方法是被JobGenerator的generateJobs方法調用的。

        

ReceiverBlockTracker類中重要的方法,getBlocksOfBatch。

       

 該方法是被ReceiverTracker的getBlocksOfBatch調用。

       

ReceiverTracker的getBlocksOfBatch方法是被ReceiverInputDStream的compute方法調用的。

總結:

Receiver接收到數據,而後合併並存儲數據以後,ReceiverSupervisorImpl會把Block的元數據彙報給ReceiverTracker內部的消息通訊體ReceiverTrackerEndpoint。

ReceiverTracker接收到Block的元數據信息以後,由ReceivedBlockTracker管理Block的元數據的分配,JobGenerator會將每一個Batch,從ReceivedBlockTracker中獲取屬於該Batch的Block元數據信息來生成RDD。

從設計模式來說:ReceiverTrackerEndpoint和ReceivedBlockTracker是門面設計模式,內部實際幹事情的是ReceivedBlockTracker,外部通訊體或者表明者就是ReceiverTrackerEndpoint。

相關文章
相關標籤/搜索