第2課:經過案例對SparkStreaming 透徹理解三板斧之二:解密SparkStreaming

內容:網絡

1,解密Spark Streaming運行機制架構

2,解密Spark Streaming架構併發

DStream是邏輯級別的,而RDD是物理級別的。DStream是隨着時間的流動內部將集合封裝RDD。對DStream的操做,轉過來對其內部的RDD操做。socket

縱軸爲空間維度:表明的是RDD的依賴關係構成的具體的處理邏輯的步驟,是用DStream來表示的。oop

橫軸爲時間維度:按照特定的時間間隔不斷地生成job對象,並在集羣上運行。spa

隨着時間的推移,基於DStream Graph 不斷生成RDD Graph ,也即DAG的方式生成job,並經過Job Scheduler的線程池的方式提交給spark cluster不斷的執行。線程

由上可知,RDD    與  DStream的關係以下對象

RDD是物理級別的,而 DStream 是邏輯級別的繼承

DStream是RDD的封裝類,是RDD進一步的抽象隊列

DStream 是RDD的模板。DStream要依賴RDD進行具體的數據計算

注意:縱軸維度須要RDD,DAG的生成模板,須要TimeLine的job控制器

橫軸維度(時間維度)包含batch interval,窗口長度,窗口滑動時間等。

3,Spark Streaming源碼解析

StreamingContext方法中調用JobSchedulerstart方法

JobGenerator的start方法中,調用startFirstTime方法,來開啓定時生成Job的定時器

startFirstTime方法,首先調用DStreamGraph的start方法,而後再調用RecurringTimer的start方法。

timer對象爲一個定時器,根據batchInterval時間間隔按期向EventLoop發送GenerateJobs的消息。

接收到GenerateJobs消息後,會回調generateJobs方法。

generateJobs方法再調用DStreamGraph的generateJobs方法生成Job

DStreamGraph的generateJobs方法

DStreamGraph的實例化是在StreamingContext中的

DStreamGraph類中保存了輸入流和輸出流信息

回到JobGenerator的start方法中receiverTracker.start()

其中ReceiverTrackerEndpoint對象爲一個消息循環體

launchReceivers方法中發送StartAllReceivers消息

接收到StartAllReceivers消息後,進行以下處理

StartReceiverFunc方法以下,實例化Receiver監控者,開啓並等待退出

supervisor的start方法中調用startReceiver方法

咱們以socketTextStream爲例,其啓動的是SocketReceiver,內部開啓一個線程,來接收數據。

內部調用supervisor的pushSingle方法,將數據彙集後存放在內存中

supervisor的pushSingle方法以下,將數據放入到defaultBlockGenerator中,defaultBlockGenerator爲BlockGenerator,保存Socket接收到的數據

BlockGenerator對象中有一個定時器,來更新當前的Buffer

BlockGenerator對象中有一個線程,來從阻塞隊列中取出數據

調用ReceiverSupervisorImpl類中的繼承BlockGeneratorListener的匿名類中的onPushBlock方法。

receivedBlockHandler對象以下

這裏咱們講解BlockManagerBasedBlockHandler的方式

trackerEndpoint以下

實際上是發送給ReceiverTrackerEndpoint類,

InputInfoTracker類的reportInfo方法只是對數據進行記錄統計

其generateJob方法是被DStreamGraph調用

DStreamGraph的generateJobs方法是被JobGenerator類的generateJobs方法調用。

JobGenerator類中有一個定時器,batchInterval發送GenerateJobs消息

總結:

1,當調用StreamingContext的start方法時,啓動了JobScheduler

2,當JobScheduler啓動後會前後啓動ReceiverTracker和JobGenerator

3,ReceiverTracker啓動後會建立ReceiverTrackerEndpoint這個消息循環體,來接收運行在Executor上的Receiver發送過來的消息

4,ReceiverTracker在啓動時會給本身發送StartAllReceivers消息,本身接收到消息後,向Spark提交startReceiverFunc的Job

5,startReceiverFunc方法中在Executor上啓動Receiver,並實例化ReceiverSupervisorImpl對象,來監控Receiver的運行

6,ReceiverSupervisorImpl對象會調用Receiver的onStart方法,咱們以SocketReceiver爲例,啓動一個線程,鏈接Server,讀取網絡數據先調用ReceiverSupervisorImpl的pushSingle方法,

保存在BlockGenerator對象中,該對象內部有個定時器,放到阻塞隊列blocksForPushing,等待內部線程取出數據放到BlockManager中,併發AddBlock消息給ReceiverTrackerEndpoint。

ReceiverTrackerEndpoint爲ReceiverTracker的內部類,在接收到addBlock消息後將streamId對應的數據阻塞隊列streamIdToUnallocatedBlockQueues中

7,JobGenerator啓動後會啓動以batchInterval時間間隔發送GenerateJobs消息的定時器

8,接收到GenerateJobs消息會前後觸發ReceiverTracker的allocateBlocksToBatch方法和DStreamGraph的generateJobs方法

9,ReceiverTracker的allocateBlocksToBatch方法會調用getReceivedBlockQueue方法從阻塞隊列streamIdToUnallocatedBlockQueues中根據streamId獲取數據

10,DStreamGraph的generateJobs方法,繼而調用變量名爲outputStreams的DStream集合的generateJob方法

11,繼而調用DStream的getOrCompute來調用具體的DStream的compute方法,咱們以ReceiverInputDStream爲例,compute方法是從ReceiverTracker中獲取數據

相關文章
相關標籤/搜索