SparkStreaming(源碼閱讀十二)

  要完整去學習spark源碼是一件很是不容易的事情,可是咱能夠聚沙成塔嘛~那麼,Spark Streaming是怎麼搞的呢?緩存

  本質上,SparkStreaming接收實時輸入數據流並將它們按批次劃分,而後交給Spark引擎處理生成按照批次劃分的結果流oop

  

  SparkStreaming提供了表示連續數據流的、高度抽象的被稱爲離散流的Dstream,可使用kafka、Flume和Kiness這些數據源的輸入數據流建立Dstream,也能夠在其餘Dstream上使用map、reduce、join、window等操做建立Dsteram。Dstream本質上呢,是表示RDD的序列源碼分析

  Spark Streaming首先將數據切分爲必定時間範圍(Duration)的數據集,而後積累一批(Batch)Duration數據集後單獨啓動一個任務線程處理。Spark核心提供的從DAG從新調度任務和並行執行,可以快速完成數據從故障中恢復的工做。學習

  那麼下來就從SparkStreaming 的StreamingContext初始化開始:spa

  StreamingContext傳入的參數:一、SparkContext也就是說Spark Streaming的最終處理實際是交給SparkContext。二、Checkpoint:檢查點.三、Duration:設定streaming每一個批次的積累時間。固然,也能夠不用設置檢查點。線程

  Dstream是Spark Streaming中全部數據流的抽象,這裏對抽象類Dstream定義的一些主要方法:對象

  一、dependencies:Dstream依賴的父級Dstream列表。blog

  二、comput(validTime:Time):指定時間生成一個RDD。接口

  三、isInitialized:Dstream是否已經初始化。事件

  四、persist(level:StorageLevel):使用指定的存儲級別持久化Dstream的RDD。

  五、persist:存儲到內存

  六、cache:緩存到內存,與persisit方法同樣。

  (這裏詳細說下cache與persist的不一樣點:cache只有一個默認的緩存級別MEMORY_ONLY ,而persist能夠根據狀況設置其它的緩存級別。)

  七、checkpoint(interval:Duration):設置Dstream及祖宗Dstream的DstreamGraph;

  八、getOrCompute(time:Time):從緩存generatedRDDs = new HashMap[Time,RDD[T]]中獲取RDD,若是緩存不存在,則生成RDD並持久化、設置檢查點放入緩存。

  九、generateJob(time:Time):給指定的Time對象生成Job.

  十、window(windowDuration:Duration):基於原有的Dstream,返回一個包含了全部在時間滑動窗口中可見元素的新的Dstream.

  ......

  Dsteam本質上是表示連續的一些列的RDD,Dstream中的每一個RDD包含了必定間隔的數據,任何對Dstream的操做都會轉化爲底層RDD的操做。在Spark Streaming中,Dstream提供的接口與RDD提供的接口很是類似。構建完ReciverInputDStream後,會調用各類Dstream的接口方法,對Dstream進行各類轉換,最後各個Dstream之間的依賴關係就造成了一張DStream Graph:

  整個流程所涉及的組件爲:

  一、Reciever:Spark Streaming內置的輸入流接收器或用戶自定義的接收器,用於從數據源接收源源不斷的數據流。

  二、currentBuffer:用於緩存輸入流接收器接收的數據流。

  三、blockIntervalTimer:一個定時器,用於將CurrentBuffer中緩存的數據流封裝爲Block後放入blocksForPushing。

  四、blockForPushing:用於緩存將要使用的Block。

  五、blockPushingThread:此線程每隔100毫秒從blocksForPushing中取出一個Block存入存儲體系,並緩存到ReceivedBlockQueue。

  六、Block Batch:Block批次,按照批次時間間隔,從RecievedBlockQueue中獲取一批Block。

  七、JobGenerator:Job生成器,用於給每一批Blcok生成一個Job。

   下來繼續回到StreamingContext,在StreamingContext中new了一個JobScheduler,它裏面創了EventLoop,對這個名字是否是很熟悉?沒錯,就是在Netty通訊交互時建立的對象,主要用於處理JobSchedular的事件。而後啓動StrreamingListenerBus,用於更新Spark UI中的StreamTab的內容。 那麼最重要的就是下來建立ReceiverTracker,它用於處理數據接收、數據緩存、Block生成等工做。最後啓動JobGenerator,負責對DstreamGraph的初始化Dstream與RDD的轉換生成JOB提交執行等工做。

  

  曾經是用ReciverTrackerActor接收來自Reciver的消息,包括RegisterReceiver、AddBlock、ReportError、DeregisterReceiver等,如今再也不使用Akka進行通訊,而是使用RPC。

  回到launchReceivers,調用了SparkContext的makeRDD方法,將全部Receiver封裝爲ParallelCollectionRDD,並行度是receivers的數量,makeRDD方法實際調用了parallelize:

  

  今天到此爲止。。明天再來會你這磨人的小妖精,玩別的去啦~~~

 

參考文獻:《深刻理解Spark:核心思想與源碼分析》

相關文章
相關標籤/搜索