本期內容:多線程
解密Spark Streaming Job架構和運行機制架構
解密Spark Streaming 容錯架構和運行機制框架
一切不能進行實時流處理的數據都是無效的數據。在流處理時代,SparkStreaming有着強大吸引力,並且發展前景廣闊,加之Spark的生態系統,Streaming能夠方便調用其餘的諸如SQL,MLlib等強大框架,它必將一統天下。socket
Spark Streaming運行時與其說是Spark Core上的一個流式處理框架,不如說是Spark Core上的一個最複雜的應用程序。若是能夠掌握Spark streaming這個複雜的應用程序,那麼其餘的再複雜的應用程序都不在話下了。這裏選擇Spark Streaming做爲版本定製的切入點也是大勢所趨。分佈式
本節課經過從job和容錯的總體架構上來考察Spark Streaming的運行機制。ide
用以前已有的最簡單的例子:oop
// Socket來源的單詞計數 // YY課堂:天天20:00現場授課頻道68917580 val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) words.print() ssc.start()
跟蹤源碼能夠發現:ui
在初始化 StreamingContext時,建立了以下幾個對象:this
// StreamingContext.scala line 183 private[streaming] val scheduler = new JobScheduler(this)
而JobScheduler在初始化的時候,會初始化jobGenerator,且包含receiverTracker。spa
// JobScheduler.scala line 50 private val jobGenerator = new JobGenerator(this) // line 50 val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null // 56
再看建立DStream的部分
// StreamingContext.scala line 327 def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } // StreamingContext.scala line 345 def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) // line 351 }
// SocketInputDStream.scala line 33 private[streaming] class SocketInputDStream[T: ClassTag]( ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { // 這個方法是關鍵 def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } }
再看 ssc.start
// StreamingContext.scala line 594 def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() // line 610 } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
第610行,調用了scheduler.start,scheduler就是以前初始化是產生的JobScheduler。
// JobScheduler.scala line 62 def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) // line 80 inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
請看80行,將receiverTracker初始化:
// ReceiverTracker.scala line 101 private[streaming] class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging { private val receiverInputStreams = ssc.graph.getReceiverInputStreams() private val receiverInputStreamIds = receiverInputStreams.map { _.id } private val receivedBlockTracker = new ReceivedBlockTracker( ssc.sparkContext.conf, ssc.sparkContext.hadoopConfiguration, receiverInputStreamIds, ssc.scheduler.clock, ssc.isCheckpointPresent, Option(ssc.checkpointDir) )
調用receiverTracker.start和jobGenerator.star
// ReceiverTracker.scala line 148 /** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() // line 157 logInfo("ReceiverTracker started") trackerState = Started } }
launchReceivers()
// ReceiverTracker.scala line 413 private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() // 這個就是SocketInputDStream.getReceiver(),本例中是SocketReceiver ,見SocketInputDStream.scala line 34 rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) // line 423 }
看看StartAllReceivers是如何被消費的?
// ReceiverTracker.scala line 448 // Local messages case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // 儘可能負載均勻 for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) // 啓動接收器,再也不進一步深究,有興趣的能夠繼續查看源碼 }
再回到JobScheduler.scala line 83,jobGenerator.start
// JobGenerator.scala line 79 def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
至此消息接收和Job生成器已啓動。
在StreamingContext調用start方法的內部實際上是會啓動JobScheduler的Start方法,進行消息循環,在JobScheduler的start內部會構造JobGenerator和ReceiverTacker,而且調用JobGenerator和ReceiverTacker的start方法
1.JobGenerator啓動後會不斷的根據batchDuration生成一個個的Job
2.ReceiverTracker啓動後首先在Spark Cluster中啓動Receiver(實際上是在Executor中先啓動ReceiverSupervisor),在Receiver收到數據後會經過ReceiverSupervisor存儲到Executor而且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker內部會經過ReceivedBlockTracker來管理接受到的元數據信息
每一個BatchInterval會產生一個具體的Job,其實這裏的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD的DAG而已,從Java角度講,至關於Runnable接口實例,此時要想運行Job須要提交給JobScheduler,在JobScheduler中經過線程池的方式找到一個單獨的線程來提交Job到集羣運行(實際上是在線程中基於RDD的Action觸發真正的做業的運行)。
爲何使用線程池呢?
1.做業不斷生成,因此爲了提高效率,咱們須要線程池;這和在Executor中經過線程池執行Task有殊途同歸之妙;
2.有可能設置了Job的FAIR公平調度的方式,這個時候也須要多線程的支持。
第二部分:從容錯架構的角度透視Spark Streaming
咱們知道DStream與RDD的關係就是隨着時間流逝不斷的產生RDD,對DStream的操做就是在固定時間上操做RDD。因此從某種意義上而言,Spark Streaming的基於DStream的容錯機制,實際上就是劃分到每一次造成的RDD的容錯機制,這也是Spark Streaming的高明之處。
RDD做爲 分佈式彈性數據集,它的彈性主要體如今:
1.自動的分配內存和硬盤,優先基於內存
2.基於lineage容錯機制
3.task會指定次數的重試
4.stage失敗會自動重試
5.checkpoint和persist 複用
6.數據調度彈性:DAG,TASK和資源管理無關。
7.數據分片的高度彈性
基於RDD的特性,它的容錯機制主要就是兩種:一是checkpoint,二是基於lineage(血統)的容錯。通常而言,spark選擇血統容錯,由於對於大規模的數據集,作檢查點的成本很高。可是有的狀況下,不如說lineage鏈條過於複雜和冗長,這時候就須要作checkpoint。
考慮到RDD的依賴關係,每一個stage內部都是窄依賴,此時通常基於lineage容錯,方便高效。在stage之間,是寬依賴,產生了shuffle操做,這種狀況下,作檢查點則更好。總結來講,stage內部作lineage,stage之間作checkpoint。
後續的會有什麼更深的內幕?且聽下回分解。