經過案例對SparkStreaming 透徹理解三板斧之一:解密SparkStreaming運行機制

1 解密Spark Streaming Job架構和運行機制1 解密Spark Streaming Job架構和運行機制sql

  第一部分經過案例透視Job的執行過程,案例代碼以下:apache

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 
  *
  * @author DT大數據夢工廠
  * 新浪微博:http://weibo.com/ilovepains/
  *
  * 背景描述:在廣告點擊計費系統中,咱們在線過濾掉黑名單的點擊,進而保護廣告商的利益,只進行有效的廣告點擊計費
  * 	或者在防刷評分(或者流量)系統,過濾掉無效的點擊或者評分或者流量;
  * 實現技術:使用transform Api直接基於RDD編程,進行join操做
  *
  */
object OnlineForeachRDD2DB {
  def main(args: Array[String]){
    /**
      * 第1步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,
      * 例如說經過setMaster來設置程序要連接的Spark集羣的Master的URL,若是設置
      * 爲local,則表明Spark程序在本地運行,特別適合於機器配置條件很是差(例如
      * 只有1G的內存)的初學者       *
      */
    val conf = new SparkConf() //建立SparkConf對象
    conf.setAppName("OnlineForeachRDD") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱
//    conf.setMaster("spark://Master:7077") //此時,程序在Spark集羣
    conf.setMaster("local[6]")
    //設置batchDuration時間間隔來控制Job生成的頻率而且建立Spark Streaming執行的入口
    val ssc = new StreamingContext(conf, Seconds(5))


    val lines = ssc.socketTextStream("Master", 9999)

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords => {
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => {
          val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
          val stmt = connection.createStatement();
          stmt.executeUpdate(sql);

        })
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse

      }

      }
    }


    /**
      * 在StreamingContext調用start方法的內部實際上是會啓動JobScheduler的Start方法,進行消息循環,在JobScheduler
      * 的start內部會構造JobGenerator和ReceiverTacker,而且調用JobGenerator和ReceiverTacker的start方法:
      *   1,JobGenerator啓動後會不斷的根據batchDuration生成一個個的Job
      *   2,ReceiverTracker啓動後首先在Spark Cluster中啓動Receiver(實際上是在Executor中先啓動ReceiverSupervisor),在Receiver收到
      *   數據後會經過ReceiverSupervisor存儲到Executor而且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker
      *   內部會經過ReceivedBlockTracker來管理接受到的元數據信息
      * 每一個BatchInterval會產生一個具體的Job,其實這裏的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD
      * 的DAG而已,從Java角度講,至關於Runnable接口實例,此時要想運行Job須要提交給JobScheduler,在JobScheduler中經過線程池的方式找到一個
      * 單獨的線程來提交Job到集羣運行(實際上是在線程中基於RDD的Action觸發真正的做業的運行),爲何使用線程池呢?
      *   1,做業不斷生成,因此爲了提高效率,咱們須要線程池;這和在Executor中經過線程池執行Task有殊途同歸之妙;
      *   2,有可能設置了Job的FAIR公平調度的方式,這個時候也須要多線程的支持;
      *
      */
    ssc.start()
    ssc.awaitTermination()

  }
}

2 解密Spark Streaming 容錯架構和運行機制編程

任務容錯安全

咱們知道DStream與RDD的關係就是隨着時間流逝不斷的產生RDD,對DStream的操做就是在固定時間上操做RDD。因此從某種意義上而言,Spark Streaming的基於DStream的容錯機制,實際上就是劃分到每一次造成的RDD的容錯機制,這也是Spark Streaming的高明之處。RDD做爲 分佈式彈性數據集,它的彈性主要體如今:多線程

1.自動的分配內存和硬盤,優先基於內存架構

2.基於lineage容錯機制socket

3.task會指定次數的重試分佈式

4.stage失敗會自動重試大數據

5.checkpoint和persist 複用spa

6.數據調度彈性:DAG,TASK和資源管理無關。

7.數據分片的高度彈性

  基於RDD的特性,它的容錯機制主要就是兩種:一是checkpoint,二是基於lineage(血統)的容錯。通常而言,spark選擇血統容錯,由於對於大規模的數據集,作檢查點的成本很高。可是有的狀況下,不如說lineage鏈條過於複雜和冗長,這時候就須要作checkpoint。

  考慮到RDD的依賴關係,每一個stage內部都是窄依賴,此時通常基於lineage容錯,方便高效。在stage之間,是寬依賴,產生了shuffle操做,這種狀況下,作檢查點則更好。總結來講,stage內部作lineage,stage之間作checkpoint。

2.任務安全性

1.Driver容錯 

a)每一個job生成以前作一個checkpoint,生成以後再作一個checkpoint,若是出錯,就從checkpoint中恢復。 

 

2. Executor 容錯:

a)接收數據的安全性:sparkStreaming接收到數據默認的存儲方式是memory_and_disk_ser_2 的方式; wal方式(writer ahead log),在數據到來的時候,先作記錄,而後再存到executor,

b)執行的安全性

徹底基於RDD容錯 ,

RDD可以記住每一個轉換操做,對應於Lineage圖中的一個步驟,恢復丟失分區數據時不須要寫日誌記錄大量數據
相關文章
相關標籤/搜索