Spark版本定製一:經過案例對SparkStreaming透徹理解三板斧之一

本期內容:算法

1 Spark Streaming另類在線實驗數據庫

2 瞬間理解Spark Streaming本質瀏覽器

問:爲何從Spark Streaming來切入spark源碼版本訂製?
  1. Spark最開始的時候並無Spark Streaming、Spark Sql、Spark ML、Spark R、Spark Graphx等相關的內容,就是很原始的Spark Core,Spark Streaming自己是Spark Core上的一個框架,透過一個框架的完全研究能夠完全精通spark的方方面面;
  2. Spark用的最多的除了Spark Core以外,就是Spark Sql,Spark Sql因爲涉及了太多的Sql語法細節的解析或優化,不適合做爲一個具體的子框架來完全研究Spark,而Spark R因爲功能有限和不成熟,也排除掉,Spark Graphx最近發行的幾個版本基本上沒有改進,意味着Graphx基本上發展到盡頭了,另外圖計算有 不少數學計算算法,而Spark ML再封裝了Vector(向量)、Matrix(矩陣)以及結合RDD構建了衆多的庫,ML也有不少的數學知識,因此綜合下來選擇了Spark Streaming來入手訂製Spark版本;
  3. Spark Streaming是Spark工程師中應用最廣、市場最好、最有吸引力的技術;
問:spark streaming到底有什麼樣的魔力呢?
  1. spark streaming是流式計算,這是一個流處理的時代,一切數據若是不是流失處理或者與流失處理不相關的話都是無效的數據;
  2. 流式處理纔是真正對大數據的初步印象,數據流進來,立馬給出反饋,而不是批處理、數據挖掘,固然它最強悍的地方,在於Spark Streaming能夠在線的使用機器學習的成果或者圖計算的結果或者Spark Sql、Spark R的結果,這是Spark一統江湖的根源;
  3. Spark的程序中,Spark Streaming是最容易出問題的!由於數據是不斷的流進來,它要動態控制數據的流入、做業的切分還有數據的處理,最容易出錯的地方,也是最容易體現我的價值的地方;
  4. Spark Streaming很像Spark Core之上的一個應用程序,若是精通了Spark Streaming,對於任何Spark問題都不在話下;
  5. Spark Streaming贏在轉折點的趨勢點上;它能感知程序下一步要作什麼;
 

1、Spark Streaming另類在線實驗

案例說明:
      廣告點擊的在線黑名單過濾:在廣告點擊計費系統中在線過濾掉黑名單的點擊,進而保護廣告商的利益 只有效的廣告點擊計費
實驗技巧:
      實戰中咱們通常會把Batch Interval設置的很小好比5s 10s 30s等,這裏咱們採用放大Batch Interval的技巧,把Batch Interval設置爲300s,放大後對於job執行沒有任何影響,可是更利於觀察數據的流入、Job執行的過程等;
 
實驗代碼:
object OnlineBlackListFilter {
    def main(args: Array[String]){
      /**
       * 第1步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,例如說經過setMaster來設置程序要連接的Spark集羣的Master的URL,若是設置
       * 爲local,則表明Spark程序在本地運行,特別適合於機器配置條件很是差(例如只有1G的內存)的初學者
       */
      val conf = new SparkConf() //建立SparkConf對象
      conf.setAppName("OnlineBlackListFilter") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱
      conf.setMaster("spark://Master:7077") //此時,程序在Spark集羣
 
 
      //val ssc = new StreamingContext(conf, Seconds(30))
      //由30s改成300s
      val ssc = new StreamingContext(conf, Seconds(300))
 
      /**
       * 黑名單數據準備,實際上黑名單通常都是動態的,例如在Redis或者數據庫中,黑名單的生成每每有複雜的業務邏輯,具體狀況算法不一樣,可是在Spark Streaming進行處理的時候每次都能工訪問完整的信息
       */
      val blackList = Array(("hadoop", true),("mahout", true))
      val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
 
      val adsClickStream = ssc.socketTextStream("Master", 9999)
 
      /**
       * 此處模擬的廣告點擊的每條數據的格式爲:time、name 此處map操做的結果是name、(time,name)的格式
       */
      val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
      adsClickStreamFormatted.transform(userClickRDD => {
        //經過leftOuterJoin操做既保留了左側用戶廣告點擊內容的RDD的全部內容,又得到了相應點擊內容是否在黑名單中
        val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
 
        /**
         * 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean))
         * 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行leftOuterJoin的時候是否存在在值
         * 若是存在的話,表面當前廣告點擊是黑名單,須要過濾掉,不然的話則是有效點擊內容;
         */
        val validClicked = joinedBlackListRDD.filter(joinedItem => {
          if(joinedItem._2._2.getOrElse(false))
          {
            false
          } else {
            true
          }
        })
 
        validClicked.map(validClick => {validClick._2._1})
      }).print
 
      /**
       * 計算後的有效數據通常都會寫入Kafka中,下游的計費系統會從kafka中pull到有效數據進行計費
       */
      ssc.start()
      ssc.awaitTermination()
    }
}

  

實驗步驟:
      一、啓動spark集羣和spark history server進程(查看job的執行軌跡);
      二、在Master節點執行命令nc -lk 9999啓動數據發送服務執行腳本SparkStreamingApps.sh啓動job;
       三、觀察執行結果:           
                在數據發送端口輸入若干數據,好比:
                2255554 Spark
                455554444 Hadoop
                55555 Flink
                66666 Kafka
                6666855 RockySpark
                666638 Scala
                66666 DT_Spark
                經過瀏覽器查看Spark History Server信息:                
                
                點開第一個Job:
                Job0內幕:
                 
                    這邊有5個完成的Job,咱們實際執行的是一個Job,這裏咱們能夠分析出不少內幕,5個job從上到下分別Revicer、print、print、print、start,點擊進入start對應的DAG可視化視圖:
                   
                    從 Job0對應的DAG圖和案例代碼中能夠看出此Job並非咱們對應的業務邏輯代碼,從而能夠得出以下結論:
                     Spark Streaming在運行的過程當中,本身會啓動一些job,好比咱們點擊Stage id下的Stage1進去,查看 Aggregated Metrics by Executor部分
 
                    
                   
                       發現4個節點上都Job運行,最大化的利用了集羣的資源,也充分說明了Spark Streaming就是一個應用程序;
             
                   Job1內幕:
              
                   
                   
                  問題:咱們的應用程序沒有1.5min中的Job,那爲何有1.5min中的Task呢?
                          是數據接收器Receiver,一直不斷循環的接收數據,因此須要持續的運行!Receiver就是一個Job!Receiver經過Job啓動!Receiver運行在Executor上,以一個Task運行,Receiver接收數據和普通的Job沒有區別,
                          這給咱們的啓發是:Spark Application中能夠啓動不少的Job,不一樣的Job能夠相互配合,爲咱們寫複雜的應用程序奠基了良好的基礎;而複雜的程序必定是有多個Job構成的!
 
                  從Tasks中的圖中能夠看到Locallty Level是PROCESS_LOCAL,沒有從節點,Spark Streaming接收數據的方式,默認是MEMORY_AND_DISK_SER_2的
                  方式,再次說明數據量比較少,數據接收不會使用磁盤,而是直接使用內存中的數據;
 
                 Job2內幕:
                 
               
                 
                   從圖中能夠看出Job2主要負責執行程序的業務邏輯代碼!Task分散在各個Executor上,充分利用了集羣的資源!
 

2、 瞬間理解Spark Streaming本質

       

       圖中的描述從上到下依次以下:微信

             Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各類來源的實時輸入數據,進行處理後,處理結果保存在HDFS、Databases等各類地方。框架

             Spark Streaming接收這些實時輸入數據流,會將它們按批次劃分,而後交給Spark引擎處理,生成按照批次劃分的結果流。機器學習

             Spark Streaming提供了表示連續數據流的、高度抽象的被稱爲離散流的DStream。DStream本質上表示RDD的序列。任何對DStream的操做都會轉變爲對底層RDD的操做。socket

             Spark Streaming使用數據源產生的數據流建立DStream,也能夠在已有的DStream上使用一些操做來建立新的DStream。分佈式

       

       DStream特性:oop

              DStream是一個沒有邊界的集合,沒有大小的限制。學習

             DStream表明了時空的概念。隨着時間的推移,裏面不斷產生RDD。

              鎖定到時間段後,就是空間的操做。也就是對本時間段的對應批次的數據的處理

 

      下面介紹Spark Streaming內部實現原理:

      Spark Streaming程序轉換爲DStream Graph

      

         使用Spark Streaming編寫的程序與編寫Spark程序很是類似,在Spark程序中,主要經過操做RDD(Resilient Distributed Datasets彈性分佈式數據集)提供的接口,如map、reduce、filter等,實現數據的批處理。

         而在Spark Streaming中,則經過操做DStream(表示數據流的RDD序列)提供的接口,這些接口和RDD提供的接口相似。

 

    DStream Graph轉換爲Spark jobs

        

Spark Streaming把程序中對DStream的操做轉換爲DStream Graph,對於每一個時間片,DStream Graph都會產生一個RDD Graph;針對每一個輸出操做(如print、foreach等),Spark Streaming都會建立一個Spark action;對於每一個Spark action,Spark Streaming都會產生一個相應的Spark job,並交給JobManager。JobManager中維護着一個Jobs隊列, Spark job存儲在這個隊列中,JobManager把Spark job提交給Spark Scheduler,Spark Scheduler負責調度Task到相應的Spark Executor上執行。

Spark Streaming的另外一大優點在於其容錯性,RDD會記住建立本身的操做,每一批輸入數據都會在內存中備份,若是因爲某個結點故障致使該結點上的數據丟失,這時能夠經過備份的數據在其它結點上重算獲得最終的結果。

正如Spark Streaming最初的目標同樣,它經過豐富的API和基於內存的高速計算引擎讓用戶能夠結合流式處理,批處理和交互查詢等應用。所以Spark Streaming適合一些須要歷史數據和實時數據結合分析的應用場合。固然,對於實時性要求不是特別高的應用也能徹底勝任。另外經過RDD的數據重用機制能夠獲得更高效的容錯處理。

 

本章總結:

1.Spark Streaming自己很像Spark Core之上的一個應用程序,內部經過調用Spark Core的RDD接口對數據進行處理;

2.Spark Application中能夠啓動不少的Job,不一樣的Job能夠相互配合,從而構建出複雜的、大型的應用程序;

3.DStream內部是轉換爲一系列的RDD去運行;

 

特別感謝王家林老師的獨具一格的講解:

王家林老師名片:

中國Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公衆號:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

QQ:1740415547

YY課堂:天天20:00現場授課頻道68917580

相關文章
相關標籤/搜索