內容:網絡
1,解密Spark Streaming運行機制架構
2,解密Spark Streaming架構併發
DStream是邏輯級別的,而RDD是物理級別的。DStream是隨着時間的流動內部將集合封裝RDD。對DStream的操做,轉過來對其內部的RDD操做。socket
縱軸爲空間維度:表明的是RDD的依賴關係構成的具體的處理邏輯的步驟,是用DStream來表示的。oop
橫軸爲時間維度:按照特定的時間間隔不斷地生成job對象,並在集羣上運行。spa
隨着時間的推移,基於DStream Graph 不斷生成RDD Graph ,也即DAG的方式生成job,並經過Job Scheduler的線程池的方式提交給spark cluster不斷的執行。線程
由上可知,RDD 與 DStream的關係以下對象
RDD是物理級別的,而 DStream 是邏輯級別的繼承
DStream是RDD的封裝類,是RDD進一步的抽象隊列
DStream 是RDD的模板。DStream要依賴RDD進行具體的數據計算
注意:縱軸維度須要RDD,DAG的生成模板,須要TimeLine的job控制器
橫軸維度(時間維度)包含batch interval,窗口長度,窗口滑動時間等。
3,Spark Streaming源碼解析
StreamingContext方法中調用JobScheduler的start方法
JobGenerator的start方法中,調用startFirstTime方法,來開啓定時生成Job的定時器
startFirstTime方法,首先調用DStreamGraph的start方法,而後再調用RecurringTimer的start方法。
timer對象爲一個定時器,根據batchInterval時間間隔按期向EventLoop發送GenerateJobs的消息。
接收到GenerateJobs消息後,會回調generateJobs方法。
generateJobs方法再調用DStreamGraph的generateJobs方法生成Job
DStreamGraph的generateJobs方法
DStreamGraph的實例化是在StreamingContext中的
DStreamGraph類中保存了輸入流和輸出流信息
回到JobGenerator的start方法中receiverTracker.start()
其中ReceiverTrackerEndpoint對象爲一個消息循環體
launchReceivers方法中發送StartAllReceivers消息
接收到StartAllReceivers消息後,進行以下處理
StartReceiverFunc方法以下,實例化Receiver監控者,開啓並等待退出
supervisor的start方法中調用startReceiver方法
咱們以socketTextStream爲例,其啓動的是SocketReceiver,內部開啓一個線程,來接收數據。
內部調用supervisor的pushSingle方法,將數據彙集後存放在內存中
supervisor的pushSingle方法以下,將數據放入到defaultBlockGenerator中,defaultBlockGenerator爲BlockGenerator,保存Socket接收到的數據
BlockGenerator對象中有一個定時器,來更新當前的Buffer
BlockGenerator對象中有一個線程,來從阻塞隊列中取出數據
調用ReceiverSupervisorImpl類中的繼承BlockGeneratorListener的匿名類中的onPushBlock方法。
receivedBlockHandler對象以下
這裏咱們講解BlockManagerBasedBlockHandler的方式
trackerEndpoint以下
實際上是發送給ReceiverTrackerEndpoint類,
InputInfoTracker類的reportInfo方法只是對數據進行記錄統計
其generateJob方法是被DStreamGraph調用
DStreamGraph的generateJobs方法是被JobGenerator類的generateJobs方法調用。
JobGenerator類中有一個定時器,batchInterval發送GenerateJobs消息
總結:
1,當調用StreamingContext的start方法時,啓動了JobScheduler
2,當JobScheduler啓動後會前後啓動ReceiverTracker和JobGenerator
3,ReceiverTracker啓動後會建立ReceiverTrackerEndpoint這個消息循環體,來接收運行在Executor上的Receiver發送過來的消息
4,ReceiverTracker在啓動時會給本身發送StartAllReceivers消息,本身接收到消息後,向Spark提交startReceiverFunc的Job
5,startReceiverFunc方法中在Executor上啓動Receiver,並實例化ReceiverSupervisorImpl對象,來監控Receiver的運行
6,ReceiverSupervisorImpl對象會調用Receiver的onStart方法,咱們以SocketReceiver爲例,啓動一個線程,鏈接Server,讀取網絡數據先調用ReceiverSupervisorImpl的pushSingle方法,
保存在BlockGenerator對象中,該對象內部有個定時器,放到阻塞隊列blocksForPushing,等待內部線程取出數據放到BlockManager中,併發AddBlock消息給ReceiverTrackerEndpoint。
ReceiverTrackerEndpoint爲ReceiverTracker的內部類,在接收到addBlock消息後將streamId對應的數據阻塞隊列streamIdToUnallocatedBlockQueues中
7,JobGenerator啓動後會啓動以batchInterval時間間隔發送GenerateJobs消息的定時器
8,接收到GenerateJobs消息會前後觸發ReceiverTracker的allocateBlocksToBatch方法和DStreamGraph的generateJobs方法
9,ReceiverTracker的allocateBlocksToBatch方法會調用getReceivedBlockQueue方法從阻塞隊列streamIdToUnallocatedBlockQueues中根據streamId獲取數據
10,DStreamGraph的generateJobs方法,繼而調用變量名爲outputStreams的DStream集合的generateJob方法
11,繼而調用DStream的getOrCompute來調用具體的DStream的compute方法,咱們以ReceiverInputDStream爲例,compute方法是從ReceiverTracker中獲取數據