spark版本定製六:Spark Streaming源碼解讀之Job動態生成和深度思考

本期內容:java

一、Spark Streaming Job生成深度思考微信

二、Spark Streaming Job生成源碼解析app

1、Spark Streaming Job生成深度思考

源碼解析:框架

1. 在大數據處理場景中,若是不是流處理的話,通常會有定時任務。例如10分鐘觸發一次,1個小時觸發一次,這就是作流處理的感受,一切不是流處理,或者與流處理無關的數據都將是沒有價值的數據,之前作批處理的時候其實也是隱形的在作流處理,一切處理終將被流處理統一!!ide

DStreams其實有三種類型:函數

第一種是輸入的DStrams,能夠有各類不一樣的數據來源構建的Stream,例如來自Socket,Kafka,Flume等;oop

第二種是輸出的DStrams,outputStreams 是邏輯級別的Action,因爲仍是Spark Streaming框架級別的,底層仍是會翻譯成物理級別的Action,就是ADD的Action;post

第三種是Transforms操做從一種DStream轉變爲另外一種DStream,即基於其餘DStream產生的。其中DStreamGraph 類記錄了數據來源的DStream,和輸出類型的DStream 大數據

產生DStreams有兩種方式:this

DSTreams要麼基於數據源產生,要麼基於其它的DStreams產生;

SparkStreaming流處理基於時間做爲觸發器,Storm基於事件做爲觸發器,基於一個又一個的Record!!

 

、Spark Streaming Job生成源碼解析

Spark 做業動態生成三大核心: 
JobGenerator: 負責Job生成。 基於源數據生成;
JobSheduler: 負責Job調度。 基於源數據調度;
ReceiverTracker: 獲取元數據。

JobGenerator和ReceiverTracker是JobScheduler的成員,從JobScheduler的start方法能夠看出!!

跟蹤源碼獲得以下運行流程:

streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)-->jobScheduler.inputInfoTracker.getInfo(time)-->jobScheduler.submitJobSet-->startFirstTime()-->graph.start()-->timer.start()

具體邏輯圖以下:

說明:此圖引用了spark版本定製班成員http://lqding.blog.51cto.com/9123978/1772958的成果,表示深深的感謝

 

下面結合調式過程和job生成結構圖,逐步追蹤源碼:

1.val ssc = new StreamingContext(conf, Seconds(5))

   ssc.start()程序運行的入口;

2.進入jobscheduler.start()

def start(): Unit = synchronized {  
  if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) }
//啓動消息循環處理線程 eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc)
//接收源數據 receiverTracker.start()
//基於源數據生成Job jobGenerator.start() logInfo("Started JobScheduler") }

3.進入receiverTracker.start()方法:

  /** Start the endpoint and receiver execution thread. */
  def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
//ReceiverTracker接收到源數據後保存在ReceiverTrackerEndpoint中 "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }

4.進入jobGenerator.start()方法:

//checkpoint的初始化操做,實例化並啓動消息循環體EventLoop,開啓定時生成Job的定時器。
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
//匿名內部類重寫onReceive方法 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }

首先看EventLoop類

/**
 * An event loop to receive events from the caller and process all events in the event thread. It
 * will start an exclusive event thread to process all events.
 *
 * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
 * handle events in time to avoid the potential OOM.
* EvenLoop類中有,後臺線程從隊列中獲取消息,而後調用onReceive方法對該消息進行處理,這裏的onReceive方法即匿名內部類中重寫onReceive方法的processEvent方法。 */ private[spark] abstract class EventLoop[E](name: String) extends Logging {
//存儲消息的LinkedBlockingDeque和後臺線程 private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true)//後臺線程 override def run(): Unit = { try { while (!stopped.get) {
//後臺線程從隊列中獲取消息 val event = eventQueue.take() try { onReceive(event)//對消息進行處理,這裏的onReceive方法即匿名內部類中重寫onReceive方法的processEvent方法 } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() }

進入最重要的processEvent方法:

//processEvent方法是對消息類型進行模式匹配,而後路由到對應處理該消息的方法中。消息的處理通常是發給另一個線程來處理的,消息循環器不處理耗時的業務邏輯
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }

進入generateJobs方法:

在獲取到數據後調用DStreamGraph的generateJobs方法來生成Job,具體以下步驟所示: 
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try {
//第一步:獲取當前時間段裏面的數據。根據分配的時間來分配具體要處理的數據。 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
//第二步:生成Job,獲取RDD的DAG依賴關係。在此基於DStream生成了RDD實例 graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) =>
//第三步:獲取streamIdToInputInfos的信息。BacthDuractions要處理的數據,以及咱們要處理的業務邏輯 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
//第四步:將生成的Job交給jobScheduler jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) }
//第五步:進行checkpoint eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }

進入DStreamGraph的generateJobs方法:

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
//這裏的outputStreams是整個DStream中的最後一個DStream。outputStream.generateJob(time)相似於RDD中從後往前推 val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }

  

進入onReceive方法:

/**
 * Invoked in the event thread when polling events from the event queue.
 *
 * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
 * and cannot process events in time. If you want to call some blocking actions, run them in
 * another thread.
不斷的從消息隊列中得到消息,一旦得到消息就會處理。 
不要在onReceive中添加阻塞的消息,若是這樣的話會不斷的阻塞消息。 
消息循環器通常都不會處理具體的業務邏輯,通常消息循環器發現消息之後都會將消息路由給其餘的線程去處理 */ protected def onReceive(event: E): Unit

其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封裝爲JobHandler提交到jobExecutor線程池中

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}


private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]

  

JobHandler對象爲實現Runnable 接口,job的run方法致使了func的調用,即基於DStream的業務邏輯

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

 
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]

  

最後總結兩點:

  1. Action RDD觸發做業的執行,這個時候做爲Runnable接口封裝,它會定義一個方法,方法裏面是基於DStream的依賴關係生成的RDD。翻譯的時候是將DStream的依賴關係翻譯成RDD的依賴關係,因爲DStream的依賴關係最後一個是action級別的,翻譯成RDD的時候,RDD的最後一個操做也應該是action級別的,若是翻譯的時候直接執行的話,就直接生成了Job,就沒有所謂的隊列,因此會將翻譯的事件放到一個函數中或者一個方法中,所以,若是這個函數沒有指定的action觸發做業是執行不了的。
  2. Spark Streaming根據時間不斷的去管理咱們生成的做業,這個時候咱們每一個做業又有action級別的操做,這個action操做是對DStream進行邏輯級別的操做,它生成每一個Job放到隊列的時候,必定會被翻譯爲RDD的操做,那基於RDD操做的最後一個必定是action級別的,若是翻譯的話直接就是觸發action的話整個Spark Streaming的Job就不受管理了。所以咱們既要保證它的翻譯,又要保證對它的管理,把DStream之間的依賴關係轉變爲RDD之間的依賴關係,最後一個DStream使得action的操做,翻譯成一個RDD之間的action操做,整個翻譯後的內容它是一塊內容,這一塊內容是放在一個函數體中的,這個函數體,就是函數的定義,這個函數因爲它只是定義尚未執行,因此它裏面的RDD的action不會執行,不會觸發Job,當咱們的JobScheduler要調度Job的時候,轉過來在線程池中拿出一條線程執行剛纔的封裝的方法。

特別感謝王家林老師的獨具一格的講解:

王家林老師名片:

中國Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公衆號:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

QQ:1740415547

YY課堂:天天20:00現場授課頻道68917580

相關文章
相關標籤/搜索