def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { //校驗 graph是否爲空 batchDuration是否爲空 outputStreams是否爲空便是否有輸出的Dstream checkpoint和checkpointDuration 必須都設置(若是作checkpoint) validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. //在新的線程中啓動scheduler 不影響主線程 //設置線程本地對象,相似ThreadLocal的做用 ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
啓動的時候有三種狀態 INITIALIZED、ACTIVE、STOPPED,這裏在主線程中啓動了scheduler(scheduler.start())
咱們能夠看到JobScheduler實例化得時候建立了jobSets 、numConcurrentJobs(默認爲1,對應一個outputstream,這裏有一個性能優化點,若是有多個outputstream的話能夠加大 spark.streaming.concurrentJobs這個參數,每一個輸出的stream不用等待)java
// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff // https://gist.github.com/AlainODea/1375759b8720a3f9f094 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock //監控Streaming做業的各類事件 val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null // A tracker to track all the input stream information as well as processed record number var inputInfoTracker: InputInfoTracker = null private var eventLoop: EventLoop[JobSchedulerEvent] = null
咱們繼續看start方法:git
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") //這裏事件處理跟實際的業務處理分開,模塊化,便於代碼的複用和維護 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) //啓動監控線程 listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() //jobGenerator用於生成做業 jobGenerator.start() logInfo("Started JobScheduler") }