本期內容: java
1,在線動態計算分類最熱門商品案例回顧與演示 node
代碼以下 sql
package com.dt.spark.streaming_scala import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用Spark Streaming+Spark SQL來在線動態計算電商中不一樣類別中最熱門的商品排名,例如手機這個類別下面最熱門的三種手機、電視這個類別 * 下最熱門的三種電視,該實例在實際生產環境下具備很是重大的意義; * * @author DT大數據夢工廠 * 新浪微博:http://weibo.com/ilovepains/ * * 實現技術:Spark Streaming+Spark SQL,之因此Spark Streaming可以使用ML、sql、graphx等功能是由於有foreachRDD和Transform * 等接口,這些接口中實際上是基於RDD進行操做,因此以RDD爲基石,就能夠直接使用Spark其它全部的功能,就像直接調用API同樣簡單。 * 假設說這裏的數據的格式:user item category,例如Rocky Samsung Android */ object OnlineTheTop3ItemForEachCategory2DB { def main(args: Array[String]){ /** * 第1步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息, * 例如說經過setMaster來設置程序要連接的Spark集羣的Master的URL,若是設置 * 爲local,則表明Spark程序在本地運行,特別適合於機器配置條件很是差(例如 * 只有1G的內存)的初學者 * */ val conf = new SparkConf() //建立SparkConf對象 conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱 conf.setMaster("spark://Master:7077") //此時,程序在Spark集羣 //設置batchDuration時間間隔來控制Job生成的頻率而且建立Spark Streaming執行的入口 val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint("/root/Documents/SparkApps/checkpoint") val userClickLogsDStream = ssc.socketTextStream("Master", 9999) val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog => (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1)) val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_, _-_, Seconds(60), Seconds(20)) categoryUserClickLogsDStream.foreachRDD { rdd => { if (rdd.isEmpty()) { println("No data inputted!!!") } else { val categoryItemRow = rdd.map(reducedItem => { val category = reducedItem._1.split("_")(0) val item = reducedItem._1.split("_")(1) val click_count = reducedItem._2 Row(category, item, click_count) }) val structType = StructType(Array( StructField("category", StringType, true), StructField("item", StringType, true), StructField("click_count", IntegerType, true) )) val hiveContext = new HiveContext(rdd.context) val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType) categoryItemDF.registerTempTable("categoryItemTable") val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" + " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " + " WHERE rank <= 3") reseltDataFram.show() val resultRowRDD = reseltDataFram.rdd resultRowRDD.foreachPartition { partitionOfRecords => { if (partitionOfRecords.isEmpty){ println("This RDD is not null but partition is null") } else { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => { val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" + record.getAs("item") + "'," + record.getAs("click_count") + ")" val stmt = connection.createStatement(); stmt.executeUpdate(sql); }) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } } } } } /** * 在StreamingContext調用start方法的內部實際上是會啓動JobScheduler的Start方法,進行消息循環,在JobScheduler * 的start內部會構造JobGenerator和ReceiverTacker,而且調用JobGenerator和ReceiverTacker的start方法: * 1,JobGenerator啓動後會不斷的根據batchDuration生成一個個的Job * 2,ReceiverTracker啓動後首先在Spark Cluster中啓動Receiver(實際上是在Executor中先啓動ReceiverSupervisor),在Receiver收到 * 數據後會經過ReceiverSupervisor存儲到Executor而且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker * 內部會經過ReceivedBlockTracker來管理接受到的元數據信息 * 每一個BatchInterval會產生一個具體的Job,其實這裏的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD * 的DAG而已,從Java角度講,至關於Runnable接口實例,此時要想運行Job須要提交給JobScheduler,在JobScheduler中經過線程池的方式找到一個 * 單獨的線程來提交Job到集羣運行(實際上是在線程中基於RDD的Action觸發真正的做業的運行),爲何使用線程池呢? * 1,做業不斷生成,因此爲了提高效率,咱們須要線程池;這和在Executor中經過線程池執行Task有殊途同歸之妙; * 2,有可能設置了Job的FAIR公平調度的方式,這個時候也須要多線程的支持; * */ ssc.start() ssc.awaitTermination() } } apache |
2,基於案例貫通Spark Streaming的運行源碼 多線程
SparkStreaming在構造的時候建立了SparkContext,這個足以說明SparkStreaming是Spark上的一個應用程序。 app
/** * Create a StreamingContext by providing the configuration necessary for a new SparkContext. * @param conf a org.apache.spark.SparkConf object specifying Spark parameters * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) } 負載均衡 private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { new SparkContext(conf) } 框架 |
ssc.socketTextStream("localhost", 9999)來建立一個SocketInputDStream。 socket
/** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } ide /** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interepreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param converter Function to convert the byte stream to objects * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) } |
其中SocketInputDStream類以下,繼承ReceiverInputDStream,實現getReceiver方法,返回SocketReceiver對象。
private[streaming] class SocketInputDStream[T: ClassTag]( ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } } |
SocketInputDStream的繼承關係SocketInputDStream->ReceiverInputDStream->InputDStream->DStream 以下圖
abstract class DStream[T: ClassTag] ( @transient private[streaming] var ssc: StreamingContext ) extends Serializable with Logging { abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext) extends DStream[T](ssc_) { abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext) extends InputDStream[T](ssc_) { private[streaming] class SocketInputDStream[T: ClassTag]( ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { |
那麼DStream和RDD是什麼關係呢?
DStream是生成RDD的模板,是邏輯級別,當達到Interval的時候這些模板會被BatchData實例化成爲RDD和DAG。
DStream是生成RDD的,將生成的RDD放在HashMap中。
// RDDs generated, marked as private[streaming] so that testsuites can access it @transient private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () /** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } } |
ssc.start()方法來啓動StreamContext,因爲Spark應用程序不能有多個SparkContext對象實例,因此Spark Streaming框架在啓動時對狀態進行判斷。
/** * Start the execution of the streams. * * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") //啓動JobScheduler scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } } |
scheduler.start()來看下JobScheduler的啓動過程,啓動了消息循環系統,監聽器,ReceiverTracker 和InputInfoTracker 。
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") //消息驅動系統 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } //啓動消息循環處理線程 eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) //啓動receiverTracker receiverTracker.start() //啓動Job生成器 jobGenerator.start() logInfo("Started JobScheduler") } 消息處理函數,處理三類消息:開始處理Job,Job已完成,錯誤上報。 private def processEvent(event: JobSchedulerEvent) { try { event match { case JobStarted(job, startTime) => handleJobStart(job, startTime) case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime) case ErrorReported(m, e) => handleError(m, e) } } catch { case e: Throwable => reportError("Error in job scheduler", e) } } |
先看下ReceiverTracker的啓動過程,內部實例化ReceiverTrackerEndpoint這個Rpc消息通訊體。
def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } } /** RpcEndpoint to receive messages from the receivers. */ private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { |
在ReceiverTracker啓動的過程當中會調用其launchReceivers方法
/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) } |
其中調用了runDummySparkJob方法來啓動Spark Streaming的框架第一個Job,其中collect這個action操做會觸發Spark Job的執行。這個方法是爲了確保每一個Slave都註冊上,避免全部Receiver都在一個節點,爲後面計算負載均衡。
/** * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the * receivers to be scheduled on the same node. * * TODO Should poll the executor number and wait for executors according to * "spark.scheduler.minRegisteredResourcesRatio" and * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job. */ private def runDummySparkJob(): Unit = { if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } assert(getExecutors.nonEmpty) } |
還調用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通訊體發送StartAllReceivers消息。ReceiverTrackerEndpoint它本身接收到消息後,先根據調度策略得到Recevier在哪一個Executor上運行,而後在調用startReceiver(receiver, executors)方法,來啓動Receiver。
override def receive: PartialFunction[Any, Unit] = { // Local messages case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) } |
在startReceiver方法中,ssc.sparkContext.submitJob提交Job的時候傳入startReceiverFunc這個方法,由於startReceiverFunc該方法是在Executor上執行的。而在startReceiverFunc方法中是實例化ReceiverSupervisorImpl對象,該對象是對Receiver進行管理和監控。這個Job是Spark Streaming框架爲咱們啓動的第二個Job,且一直運行。由於supervisor.awaitTermination()該方法會阻塞等待退出。
/** * Start a receiver along with its scheduled executors */ private def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) //實例化Receiver監控者 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") } |
接下來看下ReceiverSupervisorImpl的啓動過程,先啓動全部註冊上的BlockGenerator對象,而後向ReceiverTrackerEndpoint發送RegisterReceiver消息,再調用receiver的onStart方法。
/** Start the supervisor */ def start() { onStart() startReceiver() } override protected def onStart() { registeredBlockGenerators.foreach { _.start() } } /** Start receiver */ def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { logInfo("Starting receiver") receiverState = Started receiver.onStart() logInfo("Called receiver onStart") } else { // The driver refused us stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) } } catch { case NonFatal(t) => stop("Error starting receiver " + streamId, Some(t)) } } override protected def onReceiverStart(): Boolean = { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) trackerEndpoint.askWithRetry[Boolean](msg) } |
其中在Driver運行的ReceiverTrackerEndpoint對象接收到RegisterReceiver消息後,將streamId, typ, host, executorId, receiverEndpoint封裝爲ReceiverTrackingInfo保存到內存對象receiverTrackingInfos這個HashMap中。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Remote messages case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) => val successful = registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock(receivedBlockInfo) => if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { walBatchingThreadPool.execute(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { if (active) { context.reply(addBlock(receivedBlockInfo)) } else { throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") } } }) } else { context.reply(addBlock(receivedBlockInfo)) } /** Register a receiver */ private def registerReceiver( streamId: Int, typ: String, host: String, executorId: String, receiverEndpoint: RpcEndpointRef, senderAddress: RpcAddress ): Boolean = { if (!receiverInputStreamIds.contains(streamId)) { throw new SparkException("Register received for unexpected id " + streamId) } if (isTrackerStopping || isTrackerStopped) { return false } val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations val acceptableExecutors = if (scheduledLocations.nonEmpty) { // This receiver is registering and it's scheduled by // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it. scheduledLocations.get } else { // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it. scheduleReceiver(streamId) } def isAcceptable: Boolean = acceptableExecutors.exists { case loc: ExecutorCacheTaskLocation => loc.executorId == executorId case loc: TaskLocation => loc.host == host } if (!isAcceptable) { // Refuse it since it's scheduled to a wrong executor false } else { val name = s"${typ}-${streamId}" val receiverTrackingInfo = ReceiverTrackingInfo( streamId, ReceiverState.ACTIVE, scheduledLocations = None, runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)), name = Some(name), endpoint = Some(receiverEndpoint)) receiverTrackingInfos.put(streamId, receiverTrackingInfo) listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo)) logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) true } } |
receiver的啓動,咱們以ssc.socketTextStream("localhost", 9999)爲例,建立的是SocketReceiver對象。內部啓動一個線程來鏈接Socket Server,和讀取socket的數據並存儲。
private[streaming] class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } } } |
接下來回到JobScheduler的啓動過程的第三步啓動JobGenerator,啓動消息系統和定時器。按照batchInterval時間間隔按期發送GenerateJobs消息,消息循環體接收到該消息後回調generateJobs方法。根據特定的時間獲取具體的數據,而後調用DStreamGraph的generateJobs方法生成Job,注意這裏的Job不是Spark Core級別的Job,它只是基於DStreamGraph而生成的RDD的DAG而已,而後調用JobScheduler的submitJobSet方法,最後發送DoCheckpoint消息進行checkpoint操做。
//根據建立StreamContext時傳入的batchInterval,定時發送GenerateJobs消息 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") /** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } //啓動消息循環處理線程 eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { //開啓定時生成Job的定時器 startFirstTime() } } /** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) } /** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } } /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { //根據特定的時間獲取具體的數據 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //調用DStreamGraph的generateJobs生成Job graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) } /** Perform checkpoint for the give `time`. */ private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) } } |
DStreamGraph的generateJobs方法來調用輸出流的generateJob方法來生成Jobs集合。
//輸出流:具體Action的輸出操做 private val outputStreams = new ArrayBuffer[DStream[_]]() def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } |
來看下DStream的generateJobs方法,調用了getOrCompute方法來獲取當Interval的時候DStreamGraph會被BatchData實例化成爲RDD,若是有RDD則封裝jobFunc方法,裏面包含context.sparkContext.runJob(rdd, emptyFunc),而後返回封裝後的Job。
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } } |
接下來看JobScheduler的submitJobSet方法,向線程池中提交JobHandler。而JobHandler實現了Runnable 接口,最終調用了job.run()這個方法。看一下Job類的定義,其中run方法調用的func爲構造Job時傳入的jobFunc,其包含了context.sparkContext.runJob(rdd, emptyFunc)操做,最終致使Job的提交。
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } } private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" ssc.sc.setJobDescription( s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped. } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } } } } private[streaming] class Job(val time: Time, func: () => _) { private var _id: String = _ private var _outputOpId: Int = _ private var isSet = false private var _result: Try[_] = null private var _callSite: CallSite = null private var _startTime: Option[Long] = None private var _endTime: Option[Long] = None def run() { _result = Try(func()) } |