spark版本定製三:SparkStreaming 透徹理解三板斧之三:解密SparkStreaming運行機制和架構進階之Job和容錯

本期內容:sql

一、解密Spark Streaming Job架構和運行機制微信

二、解密Spark Streaming 容錯架構和運行機制多線程

 

1、解密Spark Streaming Job架構和運行機制

經過代碼洞察Job的執行過程:架構

object OnlineForeachRDD2DB {
  def main(args: Array[String]){
    /*
      * 第1步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息
      */
    val conf = new SparkConf() //建立SparkConf對象
    conf.setAppName("OnlineForeachRDD") //設置應用程序的名稱
//    conf.setMaster("spark://Master:7077") //此時,程序在Spark集羣
    conf.setMaster("local[6]")
 
    //設置batchDuration時間間隔來控制Job生成的頻率而且建立Spark Streaming執行的入口
    val ssc = new StreamingContext(conf, Seconds(5))
  
    val lines = ssc.socketTextStream("Master", 9999)
  
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords => {
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => {
          val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
          val stmt = connection.createStatement();
          stmt.executeUpdate(sql);
  
        })
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

經過觀察Job在Spark集羣上運行的Log和結合源代碼分析出以下流程:socket

  1. 建立SparkConf,設置Spark程序運行時的配置信息;
  2. 建立StreamingContext,設置batchDuration時間間隔來控制Job生成的頻率而且建立Spark Streaming執行的入口;
  3. 在建立StreamingContext的過程當中,會實例化JobScheduler和JobGenerator,調用StreamingContext的start方法時,在JobScheduler.start()內部實例化EventLoop,並執行EventLoop.start()進行消息循環,在JobScheduler的start方法內部會構造JobGenerator和ReveiverTracker,並分別調用它們的start方法;
  4. JobGenerator啓動後會不斷的根據batchDuration生成一個個的Job;
  5. ReceiverTracker啓動後首先在Spark Cluster中啓動Receiver,實際上是在Executor中首先啓動ReceiverSupervisor,Receiver收到數據後會經過ReceiverSupervisor存儲到Executor而且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker內部會經過ReceivedBlockTracker來管理接受到的元數據信息(元數據:數據存儲的位置、索引等)。
  6. 時間不斷的流動,job怎麼產生的?每一個BatchInterval會產生一個具體的Job,其實這裏的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD 的DAG而已,從Java角度講,至關於Runnable接口實例,此時要想運行Job須要提交給JobScheduler,在JobScheduler中經過線程池的方式找到一個單獨的線程來提交Job到集羣運行(實際上是在線程中基於RDD的Action觸發真正的做業的運行);oop

      爲何使用線程池呢?spa

  1. 做業不斷生成,因此爲了提高效率,咱們須要線程池;這和在Executor中經過線程池執行Task有殊途同歸之妙;
  2. 有可能設置了Job的FAIR公平調度的方式,這個時候也須要多線程的支持;

  

2、解密Spark Streaming 容錯架構和運行機制

        

      咱們知道DStream與RDD的關係就是隨着時間流逝不斷的產生RDD,對DStream的操做就是在固定時間上操做RDD。因此從某種意義上而言,Spark Streaming的基於DStream的容錯機制,實際上就是劃分到每一次造成的RDD的容錯機制,線程

      這也是Spark Streaming的高明之處。scala

      Spark Streaming的容錯要考慮兩個方面:日誌

  1. Driver運行失敗時的恢復

    使用Checkpoint,記錄Driver運行時的狀態,失敗後能夠讀取Checkpoint並恢復Driver狀態。

  2. 具體的每次Job運行失敗時的恢復

    要考慮到Receiver的失敗恢復,也要考慮到RDD計算失敗的恢復。Receiver能夠採用寫wal日誌的方式。RDD的容錯是spark core天生提供的,基於RDD的特性,它的容錯機制主要就是兩種:

      01. 基於checkpoint;

              在stage之間,是寬依賴,產生了shuffle操做,lineage鏈條過於複雜和冗長,這時候就須要作checkpoint。

     02. 基於lineage(血統)的容錯:

          通常而言,spark選擇血統容錯,由於對於大規模的數據集,作檢查點的成本很高。考慮到RDD的依賴關係,每一個stage內部都是窄依賴,此時通常基於lineage容錯,方便高效。

 

特別感謝王家林老師的獨具一格的講解:

王家林老師名片:

中國Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公衆號:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

QQ:1740415547

YY課堂:天天20:00現場授課頻道68917580

相關文章
相關標籤/搜索