Spark Streaming的Job究竟是如何運行的,咱們下面以一個例子來解析一下:java
package com.dt.spark.streaming import com.dt.spark.common.ConnectPool import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * 以網站熱詞排名爲例,將處理結果寫到MySQL中 * Created by dinglq on 2016/5/3. */ object WriteDataToMySQL { def main(args: Array[String]) { val conf = new SparkConf().setAppName("WriteDataToMySQL") val ssc = new StreamingContext(conf,Seconds(5)) // 假設socket輸入的數據格式爲:searchKeyword,time val ItemsStream = ssc.socketTextStream("local[2]",9999) // 將輸入數據變成(searchKeyword,1) var ItemPairs = ItemsStream.map(line =>(line.split(",")(0),1)) val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,Seconds(60),Seconds(10)) //ssc.checkpoint("/user/checkpoints/") // val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10)) /** * 接下來須要對熱詞的頻率進行排序,而DStream沒有提供sort的方法。那麼咱們能夠實現transform函數,用RDD的sortByKey實現 */ val hottestWord = ItemCount.transform(itemRDD => { val top3 = itemRDD.map(pair => (pair._2, pair._1)) .sortByKey(false).map(pair => (pair._2, pair._1)).take(3) ssc.sparkContext.makeRDD(top3) }) hottestWord.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords =>{ val conn = ConnectPool.getConnection conn.setAutoCommit(false); //設爲手動提交 val stmt = conn.createStatement(); partitionOfRecords.foreach( record => { stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')"); }) stmt.executeBatch(); conn.commit(); //提交事務 }) }) ssc.start() ssc.awaitTermination() } }
def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) }
/** * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. */ def awaitTermination() { waiter.waitForStopOrError() }
private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph } else { require(batchDur_ != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(batchDur_) newGraph } }
private[streaming] val scheduler = new JobScheduler(this)
private val jobGenerator = new JobGenerator(this)
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
val listenerBus = new StreamingListenerBus()
val ItemsStream = ssc.socketTextStream("local[2]",9999)
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
ssc.graph.addInputStream(this)
DStreamGraph(83行) def addInputStream(inputStream: InputDStream[_]) { this.synchronized { inputStream.setGraph(this) inputStreams += inputStream } }
override protected[streaming] val rateController: Option[RateController] = { if (RateController.isBackPressureEnabled(ssc.conf)) { Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration))) } else { None } }
private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() }
private[streaming] def register(): DStream[T] = { ssc.graph.addOutputStream(this) this }
/** * :: Experimental :: * * Either get the currently active StreamingContext (that is, started but not stopped), * OR recreate a StreamingContext from checkpoint data in the given path. If checkpoint data * does not exist in the provided, then create a new StreamingContext by calling the provided * `creatingFunc`. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param creatingFunc Function to create a new StreamingContext * @param hadoopConf Optional Hadoop configuration if necessary for reading from the * file system * @param createOnError Optional, whether to create a new StreamingContext if there is an * error in reading checkpoint data. By default, an exception will be * thrown on error. */ @Experimental def getActiveOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = SparkHadoopUtil.get.conf, createOnError: Boolean = false ): StreamingContext = { ACTIVATION_LOCK.synchronized { getActive().getOrElse { getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) } } }
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
eventLoop.start()
for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController)
def
addStreamingListener(streamingListener
:
StreamingListener) {
scheduler.listenerBus.addListener(streamingListener)
}
17.實例化receiverTracker和InputInfoTracker,並調用receiverTracker的start方法
receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start()
18.在receiverTracker的start方法中,會構造一個ReceiverTrackerEndpoint對象(ReceiverTracker.scala的第149行)
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }
19.獲取各個InputDStream的receiver,而且在相應的worker節點啓動Receiver 。ReceiverTracker.scala的第413行
/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) }
20.ReceiverTrackerEndpoint接收到StartAllReceivers消息,並作以下處理
ReceiverTracker.scala的第449行
case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) }
在Executor上啓動receiver,此處能夠得知,receiver能夠有多個
21.而後回到13步的代碼,調用JobGenerator.start()
JobGenerator.scala的第78行
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
22.構造EventLoop[JobGeneratorEvent],並調用其start方法
1 eventLoop.start()
23.判斷當前程序啓動時,是否使用Checkpoint數據作恢復,來選擇調用restart或者startFirstTime方法。咱們的代碼將調用
startFirstTime()
JobGenerator.scala的第190行
private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
24.調用DStreamGraph的start方法
def start(time: Time) { this.synchronized { require(zeroTime == null, "DStream graph computation already started") zeroTime = time startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validateAtStart) inputStreams.par.foreach(_.start()) } }
此時,InputDStream啓動,並開始接收數據。
InputDStream和ReceiverInputDStream的start方法都是空的。
InputDStream.scala的第110行
/** Method called to start receiving data. Subclasses must implement this method. */ def start()
ReceiverInputDStream.scala的第63行
// Nothing to start or stop as both taken care of by the ReceiverTracker. def start() {}
而SocketInputDStream沒有定義start方法,因此
1inputStreams.par.foreach(_.start())
並無作任何的事情,那麼輸入流究竟是怎麼被觸發並開始接收數據的呢?
咱們再看上面的第20步:
startReceiver(receiver, executors)
代碼的具體實如今ReceiverTracker.scala的545行
private def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") }
它會將Receiver封裝成RDD,以Job的方式提交到Spark集羣中。submitJob的第二個參數,是一個函數,它的功能是在worker節點上啓動receiver
注意這裏是以任務的方式接受數據,當任務失敗怎麼辦:
(1) 可插拔的 ReceiverSchedulingPolicy
ReceiverSchedulingPolicy 的主要目的,是在 Spark Streaming 層面添加對 Receiver 的分發目的地的計算,相對於以前版本依賴 Spark Core 的 TaskScheduler 進行通用分發,新的 ReceiverSchedulingPolicy 會對 Streaming 應用的更好的語義理解,也能計算出更好的分發策略。ReceiverSchedulingPolicy 有兩個方法,分別用於:
在 Streaming 程序首次啓動時:
收集全部 InputDStream 包含的全部 Receiver 實例 —— receivers
收集全部的 executor —— executors —— 做爲候選目的地
而後就調用 ReceiverSchedulingPolicy.scheduleReceivers(receivers, executors) 來計每一個個 Receiver 的目的地 executor 列表
在 Streaming 程序運行過程當中,若是須要重啓某個 Receiver:
將首先看一看以前計算過的目的地 executor 尚未還 alive 的
若是沒有,就須要 ReceiverSchedulingPolicy.rescheduleReceiver(receiver, ...) 來從新計算每一個 Receiver 的目的地 executor 列表
(2) 每一個 Receiver 分發有單獨的 Job 負責
對於這僅有個一個 Task,只在第 1 次執行時,才嘗試啓動 Receiver;若是該 Task 由於失效而被調度到其它 executor 執行時,就再也不嘗試啓動 Receiver、只作一個空操做,從而致使本 Job 的狀態是成功執行已完成。ReceiverTracker 會另外調起一個 Job —— 有可能會從新計算 Receiver 的目的地 —— 來繼續嘗試 Receiver 分發……如此直到成功爲止。另外,因爲 Spark Core 的 Task 下發時只會參照並大部分時候尊重 Spark Streaming 設置的 preferredLocation 目的地信息,仍是有必定可能該分發 Receiver 的 Job 並無在咱們想要調度的 executor 上運行。此時,在第 1 次執行 Task 時,會首先向ReceiverTracker 發送 RegisterReceiver 消息,只有獲得確定的答覆時,才真正啓動 Receiver,不然就繼續作一個空操做,致使本 Job 的狀態是成功執行已完成。固然,ReceiverTracker 也會另外調起一個 Job,來繼續嘗試 Receiver 分發……如此直到成功爲止,這樣就保證的數據的任務一直運行,爲咱們的集羣提供源源不斷的數據。
val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination()
在supervisor.start方法中會調用以下代碼
ReceiverSupervisor.scala的127行
/** Start the supervisor */ def start() { onStart() startReceiver() }
onStart()方法是在ReceiverSupervisorImpl中實現的(ReceiverSupervisorImpl.scala的172行)
override protected def onStart() { registeredBlockGenerators.foreach { _.start() } }
在startReceiver中,會調用receiver的Onstart方法,啓動receiver。
注:這裏要弄清楚ReceiverInputDStream和Recevier的區別。Receiver是具體接收數據的,而ReceiverInputDStream是對Receiver作了一成封裝,將數據轉換成DStream 。
咱們本例中的Receiver是經過SocketInputDStream的getReceiver方法獲取的(在第19步的時候被調用)。
ReceiverInputDStream.scala的42行
def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) }
而SocketReceiver會不斷的從Socket中獲取數據。
咱們看看SocketReceiver的onStart方法:
def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() }
/** Create a socket connection and receive data until receiver is stopped */ def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } }