spark版本定製五:基於案例一節課貫通Spark Streaming流計算框架的運行源碼

 
本期內容:
一、在線動態計算分類最熱門商品案例回顧與演示
二、基於案例貫通Spark Streaming的運行源碼
 

1、在線動態計算分類最熱門商品案例回顧與演示

案例回顧:
package com.dt.spark.sparkstreaming

import com.robinspark.utils.ConnectionPool
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用Spark Streaming+Spark SQL來在線動態計算電商中不一樣類別中最熱門的商品排名,例如手機這個類別下面最熱門的三種手機、電視這個類別
  * 下最熱門的三種電視,該實例在實際生產環境下具備很是重大的意義;
  *
  * @author DT大數據夢工廠
  * 新浪微博:http://weibo.com/ilovepains/
  *
  *
  *   實現技術:Spark Streaming+Spark SQL,之因此Spark Streaming可以使用ML、sql、graphx等功能是由於有foreachRDD和Transform
  *   等接口,這些接口中實際上是基於RDD進行操做,因此以RDD爲基石,就能夠直接使用Spark其它全部的功能,就像直接調用API同樣簡單。
  *   假設說這裏的數據的格式:user item category,例如Rocky Samsung Android
  */
object OnlineTheTop3ItemForEachCategory2DB {
  def main(args: Array[String]){
    /**
      * 第1步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,
      * 例如說經過setMaster來設置程序要連接的Spark集羣的Master的URL,若是設置
      * 爲local,則表明Spark程序在本地運行,特別適合於機器配置條件很是差(例如
      * 只有1G的內存)的初學者       *
      */
    val conf = new SparkConf() //建立SparkConf對象
    conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱
    conf.setMaster("spark://Master:7077") //此時,程序在Spark集羣
    //conf.setMaster("local[2]")
    //設置batchDuration時間間隔來控制Job生成的頻率而且建立Spark Streaming執行的入口
    val ssc = new StreamingContext(conf, Seconds(5))

    ssc.checkpoint("/root/Documents/SparkApps/checkpoint")

    val userClickLogsDStream = ssc.socketTextStream("Master", 9999)

    val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>(clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))

//    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 + v2,
//      (v1:Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20))

    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,_-_, Seconds(60), Seconds(20))

    categoryUserClickLogsDStream.foreachRDD { rdd => {
      if (rdd.isEmpty()) {
        println("No data inputted!!!")
      } else {
        val categoryItemRow = rdd.map(reducedItem => {
          val category = reducedItem._1.split("_")(0)
          val item = reducedItem._1.split("_")(1)
          val click_count = reducedItem._2
          Row(category, item, click_count)
        })

        val structType = StructType(Array(
          StructField("category", StringType, true),
          StructField("item", StringType, true),
          StructField("click_count", IntegerType, true)
        ))

        val hiveContext = new HiveContext(rdd.context)
        val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)

        categoryItemDF.registerTempTable("categoryItemTable")

        val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
          " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
          " WHERE rank <= 3")
        reseltDataFram.show()

        val resultRowRDD = reseltDataFram.rdd

        resultRowRDD.foreachPartition { partitionOfRecords => {

          if (partitionOfRecords.isEmpty){
            println("This RDD is not null but partition is null")
          } else {
            // ConnectionPool is a static, lazily initialized pool of connections
            val connection = ConnectionPool.getConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +
                record.getAs("item") + "'," + record.getAs("click_count") + ")"
              val stmt = connection.createStatement();
              stmt.executeUpdate(sql);

            })
            ConnectionPool.returnConnection(connection) // return to the pool for future reuse

           }
          }
         }
       }
     }
    }
    /**
      * 在StreamingContext調用start方法的內部實際上是會啓動JobScheduler的Start方法,進行消息循環,在JobScheduler
      * 的start內部會構造JobGenerator和ReceiverTacker,而且調用JobGenerator和ReceiverTacker的start方法:
      *   1,JobGenerator啓動後會不斷的根據batchDuration生成一個個的Job
      *   2,ReceiverTracker啓動後首先在Spark Cluster中啓動Receiver(實際上是在Executor中先啓動ReceiverSupervisor),在Receiver收到
      *   數據後會經過ReceiverSupervisor存儲到Executor而且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker
      *   內部會經過ReceivedBlockTracker來管理接受到的元數據信息
      *   每一個BatchInterval會產生一個具體的Job,其實這裏的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD
      *   的DAG而已,從Java角度講,至關於Runnable接口實例,此時要想運行Job須要提交給JobScheduler,在JobScheduler中經過線程池的方式找到一個
      *   單獨的線程來提交Job到集羣運行(實際上是在線程中基於RDD的Action觸發真正的做業的運行),爲何使用線程池呢?
      *   1,做業不斷生成,因此爲了提高效率,咱們須要線程池;這和在Executor中經過線程池執行Task有殊途同歸之妙;
      *   2,有可能設置了Job的FAIR公平調度的方式,這個時候也須要多線程的支持;
      *
      */
    ssc.start()
    ssc.awaitTermination()
  }
}

  

2、基於案例貫通Spark Streaming的運行源碼

逐步解析源碼:
1.從實例化SteamingContext入手,進入StreamingContext源碼  
//設置batchDuration時間間隔來控制Job生成的頻率而且建立Spark Streaming執行的入口
val ssc = new StreamingContext(conf, Seconds(5))


def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}

private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
  new SparkContext(conf)
}

在StreamingContext的構造方法中傳入Spark Conf會新建一個SparkContext實例,這說明Streaming是運行在Spark Core 之上的,SparkStreaming就是SparkCore上的一個應用程序!!
2.進入SocketTextStream模塊:
val userClickLogsDStream = ssc.socketTextStream( "Master", 9999),建立一個socketInputDStream, socketInputDStream中就能夠調用GetReceiver來接收數據,
在GetReceiver方法中實例化了SocketReceiver,而後調用了OnStart方法,方法中建立了一個線程,並調用了receive方法,receive中就是具體接收數據的實現。
對應的代碼以下:
1.建立socketTextStream
**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
 * lines.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}


/**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes it interepreted as object using the given
 * converter.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param converter     Function to convert the byte stream to objects
 * @param storageLevel  Storage level to use for storing the received objects
 * @tparam T            Type of the objects received (after converting bytes to objects)
 */

def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}

2.SocketInputDStream繼承ReceiverInputDStream,實現getReceiver方法,返回SocketReceiver對象java

private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

3.SocketInputDStream的繼承關係SocketInputDStream->ReceiverInputDStream->InputDStream->DStream如圖所示:node

 

4.在DStream的子類繼承結構中,有個ForEachDStream,這個類是DStream的輸出類,數據輸出的時候會在ForEachDStream的generateJob方法中建立一個job:sql

override def generateJob(time: Time): Option[Job] = {
  parent.getOrCompute(time) match {
    case Some(rdd) =>
      val jobFunc= () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
        foreachFunc(rdd, time)
      }
      Some(new Job(time, jobFunc))
    case None=> None
  }
}

咱們接下來看getOrCompute方法,從方式註釋中能夠看出rdd是緩存的:apache

/**
 * Get the RDD corresponding to the giventime; either retrieve it from cache
 * or compute-and-cache it.
 */
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
  // If RDD was already generated, then retrieve it fromHashMap,
  // or else compute the RDD
  generatedRDDs.get(time).orElse {
    // Compute the RDD if time is valid (e.g. correct time ina sliding window)
    // of RDD generation, else generatenothing.
    if (isTimeValid(time)){

      val rddOption= createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
        // Disable checks for existing output directories in jobslaunched by the streaming
        // scheduler, since we may needto write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 formore details. We need to have this call here because
        // compute() might cause Sparkjobs to be launched.
        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          compute(time)
        }
      }

      rddOption.foreach { case newRDD =>
        // Register the generated RDD for caching andcheckpointing
        if (storageLevel != StorageLevel.NONE) {
          newRDD.persist(storageLevel)
          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
        }
        if (checkpointDuration != null &&(time - zeroTime).isMultipleOf(checkpointDuration)) {
          newRDD.checkpoint()
          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
        }
        generatedRDDs.put(time, newRDD)
      }
      rddOption
    } else {
      None
    }
  }
}

5.從SocketInputDStream到ssc.start()方法之間,全是業務邏輯代碼,咱們直接進入ssc.start()方法,start()方法會啓動StreamContext,因爲Spark應用程序不能有多個SparkContext對象實例,因此Spark Streaming框架在啓動時對狀態進行判斷
緩存

/**
 * Start the execution of the streams.
 *
 * @throws IllegalStateException if the StreamingContext is already stopped.
 */
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()
          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
說明:線程本地存儲,線程ThreadLocal每一個線程有本身的私有屬性,設置線程的私有屬性不會影響當前線程或其餘線程 ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") //啓動JobScheduler scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }

上圖中的紅線代碼是JobScheduler的start方法,啓動了消息循環系統,監聽器,ReceiverTracker 和InputInfoTracker,對應代碼以下:微信

def start(): Unit = synchronized {
  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  //消息驅動系統
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  //啓動消息循環處理線程
  eventLoop.start()
  // attach rate controllers of input streams to receive batch completion updates
  for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)
  listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc)
  //啓動receiverTracker
  receiverTracker.start()
  //啓動Job生成器
  jobGenerator.start()
  logInfo("Started JobScheduler")
}

消息處理函數,處理三類消息:開始處理Job,Job已完成,錯誤上報。

private def processEvent(event: JobSchedulerEvent) {
  try {
    event match {
      case JobStarted(job, startTime) => handleJobStart(job, startTime)
      case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
      case ErrorReported(m, e) => handleError(m, e)
    }
  } catch {
    case e: Throwable =>
      reportError("Error in job scheduler", e)
  }
}

先看下ReceiverTracker的啓動過程,內部實例化ReceiverTrackerEndpoint這個Rpc消息通訊體  多線程

def start(): Unit = synchronized {
  if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
  }

  if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers()
    logInfo("ReceiverTracker started")
    trackerState = Started
  }
}

 

/** RpcEndpoint to receive messages from the receivers. */
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {}

在ReceiverTracker啓動的過程當中會調用其launchReceivers方法 app

/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })
  runDummySparkJob()
  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers))
}

其中調用了runDummySparkJob方法來啓動Spark Streaming的框架第一個Job,其中collect這個action操做會觸發Spark Job的執行。這個方法是爲了確保每一個Slave都註冊上,避免全部Receiver都在一個節點,爲後面計算負載均衡。 負載均衡

/**
 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
 * receivers to be scheduled on the same node.
 *
 * TODO Should poll the executor number and wait for executors according to
 * "spark.scheduler.minRegisteredResourcesRatio" and
 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
 */
private def runDummySparkJob(): Unit = {
  if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  }
  assert(getExecutors.nonEmpty)
}

  

還調用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通訊體發送StartAllReceivers消息。ReceiverTrackerEndpoint它本身接收到消息後,先根據調度策略得到Recevier在哪一個Executor上運行,而後在調用startReceiver(receiver, executors)方法,來啓動Receiver。框架

override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors)
    }

在startReceiver方法中,ssc.sparkContext.submitJob提交Job的時候傳入startReceiverFunc這個方法,由於startReceiverFunc該方法是在Executor上執行的。而在startReceiverFunc方法中是實例化ReceiverSupervisorImpl對象,該對象是對Receiver進行管理和監控。這個Job是Spark Streaming框架爲咱們啓動的第二個Job,且一直運行。由於supervisor.awaitTermination()該方法會阻塞等待退出。 

/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
    receiver: Receiver[_],
    scheduledLocations: Seq[TaskLocation]): Unit = {
  def shouldStartReceiver: Boolean = {
    // It's okay to start when trackerState is Initialized or Started
    !(isTrackerStopping || isTrackerStopped)
  }

  val receiverId = receiver.streamId
  if (!shouldStartReceiver) {
    onReceiverJobFinish(receiverId)
    return
  }

  val checkpointDirOption = Option(ssc.checkpointDir)
  val serializableHadoopConf =
    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

  // Function to start the receiver on the worker node
  val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator: Iterator[Receiver[_]]) => {
      if (!iterator.hasNext) {
        throw new SparkException(
          "Could not start receiver as object not found.")
      }
      if (TaskContext.get().attemptNumber() == 0) {
        val receiver = iterator.next()
        assert(iterator.hasNext == false)
        //實例化Receiver監控者 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      } else {
        // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
      }
    }

  // Create the RDD using the scheduledLocations to run the receiver in a Spark job
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
      ssc.sc.makeRDD(Seq(receiver), 1)
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
  // We will keep restarting the receiver job until ReceiverTracker is stopped
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logError("Receiver has been stopped. Try to restart it.", e)
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
  logInfo(s"Receiver ${receiver.streamId} started")
}

接下來看下ReceiverSupervisorImpl的啓動過程,先啓動全部註冊上的BlockGenerator對象,而後向ReceiverTrackerEndpoint發送RegisterReceiver消息,再調用receiver的onStart方法。

/** Start the supervisor */
def start() {
  onStart()
  startReceiver()
}

 

override protected def onStart() {
  registeredBlockGenerators.foreach { _.start() }
}

/** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState = Started
      receiver.onStart()
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    }
  } catch {
    case NonFatal(t) =>
      stop("Error starting receiver " + streamId, Some(t))
  }
}

 
override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
  trackerEndpoint.askWithRetry[Boolean](msg)
}

其中在Driver運行的ReceiverTrackerEndpoint對象接收到RegisterReceiver消息後,將streamId, typ, host, executorId, receiverEndpoint封裝爲ReceiverTrackingInfo保存到內存對象receiverTrackingInfos這個HashMap中。 

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  // Remote messages
  case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
    val successful =
      registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
    context.reply(successful)
  case AddBlock(receivedBlockInfo) =>
    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
      walBatchingThreadPool.execute(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          if (active) {
            context.reply(addBlock(receivedBlockInfo))
          } else {
            throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
          }
        }
      })
    } else {
      context.reply(addBlock(receivedBlockInfo))
    }

 

/** Register a receiver */
private def registerReceiver(
    streamId: Int,
    typ: String,
    host: String,
    executorId: String,
    receiverEndpoint: RpcEndpointRef,
    senderAddress: RpcAddress
  ): Boolean = {
  if (!receiverInputStreamIds.contains(streamId)) {
    throw new SparkException("Register received for unexpected id " + streamId)
  }

  if (isTrackerStopping || isTrackerStopped) {
    return false
  }

  val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
  val acceptableExecutors = if (scheduledLocations.nonEmpty) {
      // This receiver is registering and it's scheduled by
      // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
      scheduledLocations.get
    } else {
      // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
      // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
      scheduleReceiver(streamId)
    }

  def isAcceptable: Boolean = acceptableExecutors.exists {
    case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
    case loc: TaskLocation => loc.host == host
  }

  if (!isAcceptable) {
    // Refuse it since it's scheduled to a wrong executor
    false
  } else {
    val name = s"${typ}-${streamId}"
    val receiverTrackingInfo = ReceiverTrackingInfo(
      streamId,
      ReceiverState.ACTIVE,
      scheduledLocations = None,
      runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
      name = Some(name),
      endpoint = Some(receiverEndpoint))
    receiverTrackingInfos.put(streamId, receiverTrackingInfo)
    listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
    true
  }
}

  

receiver的啓動,咱們以ssc.socketTextStream("localhost", 9999)爲例,建立的是SocketReceiver對象。內部啓動一個線程來鏈接Socket Server,和讀取socket的數據並存儲。

private[streaming]
class SocketReceiver[T: ClassTag](
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends Receiver[T](storageLevel) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
  }
}

接下來回到JobScheduler的啓動過程的第三步啓動JobGenerator,啓動消息系統和定時器。按照batchInterval時間間隔按期發送GenerateJobs消息,消息循環體接收到該消息後回調generateJobs方法。根據特定的時間獲取具體的數據,而後調用DStreamGraph的generateJobs方法生成Job,注意這裏的Job不是Spark Core級別的Job,它只是基於DStreamGraph而生成的RDD的DAG而已,而後調用JobScheduler的submitJobSet方法,最後發送DoCheckpoint消息進行checkpoint操做。

//根據建立StreamContext時傳入的batchInterval,定時發送GenerateJobs消息
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")


/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter

  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
  //啓動消息循環處理線程 eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    //開啓定時生成Job的定時器 startFirstTime()
  }
}


/** Starts the generator for the first time */
private def startFirstTime() {
  val startTime = new Time(timer.getStartTime())
  graph.start(startTime - graph.batchDuration)
  timer.start(startTime.milliseconds)
  logInfo("Started JobGenerator at " + startTime)
}



/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}


/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {

    //根據特定的時間獲取具體的數據
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //調用DStreamGraph的generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}


/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
  if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
    logInfo("Checkpointing graph for time " + time)
    ssc.graph.updateCheckpointData(time)
    checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
  }
}

  


DStreamGraph的generateJobs方法來調用輸出流的generateJob方法來生成Jobs集合。
 

 

//輸出流:具體Action的輸出操做
private val outputStreams = new ArrayBuffer[DStream[_]]()

 
def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}


來看下DStream的generateJobs方法,調用了getOrCompute方法來獲取當Interval的時候DStreamGraph會被BatchData實例化成爲RDD,若是有RDD則封裝jobFunc方法,裏面包含context.sparkContext.runJob(rdd, emptyFunc),而後返回封裝後的Job。
 

/**
 * Generate a SparkStreaming job for the given time. This is an internal method that
 * should not be called directly. This default implementation creates a job
 * that materializes the corresponding RDD. Subclasses of DStream may override this
 * to generate their own jobs.
 */
private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) => {
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    }
    case None => None
  }
}

接下來看JobScheduler的submitJobSet方法,向線程池中提交JobHandler。而JobHandler實現了Runnable 接口,最終調用了job.run()這個方法。看一下Job類的定義,其中run方法調用的func爲構造Job時傳入的jobFunc,其包含了context.sparkContext.runJob(rdd, emptyFunc)操做,最終致使Job的提交。

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

 
private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS =false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }
  }
}

 

private[streaming]
class Job(val time: Time, func: () => _) {
  private var _id: String = _
  private var _outputOpId: Int = _
  private var isSet = false
  private var _result: Try[_] = null
  private var _callSite: CallSite = null
  private var _startTime: Option[Long] = None
  private var _endTime: Option[Long] = None

  def run() {
    _result = Try(func())
  }

至此,將job經過SparkContext的runJob方法提交給了Spark集羣!!

  

特別感謝王家林老師的獨具一格的講解:

王家林老師名片:

中國Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公衆號:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

QQ:1740415547

YY課堂:天天20:00現場授課頻道68917580

相關文章
相關標籤/搜索