本期內容:java
一、Spark Streaming Job生成深度思考微信
二、Spark Streaming Job生成源碼解析app
源碼解析:框架
1. 在大數據處理場景中,若是不是流處理的話,通常會有定時任務。例如10分鐘觸發一次,1個小時觸發一次,這就是作流處理的感受,一切不是流處理,或者與流處理無關的數據都將是沒有價值的數據,之前作批處理的時候其實也是隱形的在作流處理,一切處理終將被流處理統一!!ide
DStreams其實有三種類型:函數
第一種是輸入的DStrams,能夠有各類不一樣的數據來源構建的Stream,例如來自Socket,Kafka,Flume等;oop
第二種是輸出的DStrams,outputStreams 是邏輯級別的Action,因爲仍是Spark Streaming框架級別的,底層仍是會翻譯成物理級別的Action,就是ADD的Action;post
第三種是Transforms操做從一種DStream轉變爲另外一種DStream,即基於其餘DStream產生的。其中DStreamGraph 類記錄了數據來源的DStream,和輸出類型的DStream 大數據
產生DStreams有兩種方式:this
DSTreams要麼基於數據源產生,要麼基於其它的DStreams產生;
SparkStreaming流處理基於時間做爲觸發器,Storm基於事件做爲觸發器,基於一個又一個的Record!!
Spark 做業動態生成三大核心:
JobGenerator: 負責Job生成。 基於源數據生成;
JobSheduler: 負責Job調度。 基於源數據調度;
ReceiverTracker: 獲取元數據。
JobGenerator和ReceiverTracker是JobScheduler的成員,從JobScheduler的start方法能夠看出!!
跟蹤源碼獲得以下運行流程:
streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)-->jobScheduler.inputInfoTracker.getInfo(time)-->jobScheduler.submitJobSet-->startFirstTime()-->graph.start()-->timer.start()
具體邏輯圖以下:
說明:此圖引用了spark版本定製班成員http://lqding.blog.51cto.com/9123978/1772958的成果,表示深深的感謝
下面結合調式過程和job生成結構圖,逐步追蹤源碼:
1.val ssc = new StreamingContext(conf, Seconds(5))
ssc.start()程序運行的入口;
2.進入jobscheduler.start()
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) }
//啓動消息循環處理線程 eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc)
//接收源數據 receiverTracker.start()
//基於源數據生成Job jobGenerator.start() logInfo("Started JobScheduler") }
3.進入receiverTracker.start()方法:
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint(
//ReceiverTracker接收到源數據後保存在ReceiverTrackerEndpoint中 "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }
4.進入jobGenerator.start()方法:
//checkpoint的初始化操做,實例化並啓動消息循環體EventLoop,開啓定時生成Job的定時器。
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
//匿名內部類重寫onReceive方法 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
首先看EventLoop類
/** * An event loop to receive events from the caller and process all events in the event thread. It * will start an exclusive event thread to process all events. * * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can * handle events in time to avoid the potential OOM.
* EvenLoop類中有,後臺線程從隊列中獲取消息,而後調用onReceive方法對該消息進行處理,這裏的onReceive方法即匿名內部類中重寫onReceive方法的processEvent方法。 */ private[spark] abstract class EventLoop[E](name: String) extends Logging {
//存儲消息的LinkedBlockingDeque和後臺線程 private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true)//後臺線程 override def run(): Unit = { try { while (!stopped.get) {
//後臺線程從隊列中獲取消息 val event = eventQueue.take() try { onReceive(event)//對消息進行處理,這裏的onReceive方法即匿名內部類中重寫onReceive方法的processEvent方法 } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() }
進入最重要的processEvent方法:
//processEvent方法是對消息類型進行模式匹配,而後路由到對應處理該消息的方法中。消息的處理通常是發給另一個線程來處理的,消息循環器不處理耗時的業務邏輯
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
進入generateJobs方法:
在獲取到數據後調用DStreamGraph的generateJobs方法來生成Job,具體以下步驟所示:
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try {
//第一步:獲取當前時間段裏面的數據。根據分配的時間來分配具體要處理的數據。 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
//第二步:生成Job,獲取RDD的DAG依賴關係。在此基於DStream生成了RDD實例 graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) =>
//第三步:獲取streamIdToInputInfos的信息。BacthDuractions要處理的數據,以及咱們要處理的業務邏輯 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
//第四步:將生成的Job交給jobScheduler jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) }
//第五步:進行checkpoint eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
進入DStreamGraph的generateJobs方法:
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream =>
//這裏的outputStreams是整個DStream中的最後一個DStream。outputStream.generateJob(time)相似於RDD中從後往前推 val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
進入onReceive方法:
/** * Invoked in the event thread when polling events from the event queue. * * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked * and cannot process events in time. If you want to call some blocking actions, run them in * another thread.
不斷的從消息隊列中得到消息,一旦得到消息就會處理。
不要在onReceive中添加阻塞的消息,若是這樣的話會不斷的阻塞消息。
消息循環器通常都不會處理具體的業務邏輯,通常消息循環器發現消息之後都會將消息路由給其餘的線程去處理 */ protected def onReceive(event: E): Unit
其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封裝爲JobHandler提交到jobExecutor線程池中
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } } private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
JobHandler對象爲實現Runnable 接口,job的run方法致使了func的調用,即基於DStream的業務邏輯
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } } private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
最後總結兩點:
特別感謝王家林老師的獨具一格的講解:
王家林老師名片:
中國Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公衆號:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
QQ:1740415547
YY課堂:天天20:00現場授課頻道68917580