ReceiverTracker 啓動過程及接收器 receiver RDD 任務提交機制源碼剖析

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。java

1 ReceiverTracker is What?

  • 在Driver端須要有一個跟蹤器ReceiverTracker,而這個跟蹤器會不斷監督Executor啓動Receiver,(好比:發送Receiver Rdd 給Executor, 注意是把Receiver做爲RDD給Executor),同時管理待分配給Job的數據Block的元數據。
  • ReceiverTracker只是一個全局管理者,負責Block元數據的管理。
  • 由於在Executor上啓動的是Receiver,Receiver的啓動是接二連三地,經過把離散數據轉化爲batch,再進一步轉化爲Block,最終經由存儲體系BlockManager納管起來。最後通知Driver端ReceiverTracker已保存的Blockd的元數據信息。

1 Receiver is What?

* Abstract class of a receiver that can be run on worker nodes to receive external data. A
 * custom receiver can be defined by defining the functions `onStart()` and `onStop()`. `onStart()`
 * should define the setup steps necessary to start receiving data,
 * and `onStop()` should define the cleanup steps necessary to stop receiving data.
 * Exceptions while receiving can be handled either by restarting the receiver with `restart(...)`
 * or stopped completely by `stop(...)`.
複製代碼

1.1 Receiver超級模板

  • _supervisor :ReceiverSupervisor 啓動時威力無窮,坐擁乾坤,主要管理Executor端的數據存儲。(下一節重點剖析)node

  • store() :顧名思義,就是數據流batch 存儲算法

    @DeveloperApi
      abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
      
        /**
         * This method is called by the system when the receiver is started. This function
         * must initialize all resources (threads, buffers, etc.) necessary for receiving data.
         * This function must be non-blocking, so receiving the data must occur on a different
         * thread. Received data can be stored with Spark by calling `store(data)`.
         *
         * If there are errors in threads started here, then following options can be done
         * (i) `reportError(...)` can be called to report the error to the driver.
         * The receiving of data will continue uninterrupted.
         * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
         * clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
         * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
         * immediately, and then `onStart()` after a delay.
         */
        def onStart(): Unit
      
        /**
         * This method is called by the system when the receiver is stopped. All resources
         * (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
         */
        def onStop(): Unit
      
        /** Override this to specify a preferred location (hostname). */
        def preferredLocation: Option[String] = None
      
        /**
         * Store a single item of received data to Spark's memory.
         * These single items will be aggregated together into data blocks before
         * being pushed into Spark's memory.
         */
        def store(dataItem: T) {
          supervisor.pushSingle(dataItem)
        }
      
        /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
        def store(dataBuffer: ArrayBuffer[T]) {
          supervisor.pushArrayBuffer(dataBuffer, None, None)
        }
      
        /**
         * Store an ArrayBuffer of received data as a data block into Spark's memory.
         * The metadata will be associated with this block of data
         * for being used in the corresponding InputDStream.
         */
        def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
          supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None)
        }
      
        /** Store an iterator of received data as a data block into Spark's memory. */
        def store(dataIterator: Iterator[T]) {
          supervisor.pushIterator(dataIterator, None, None)
        }
      
        /**
         * Store an iterator of received data as a data block into Spark's memory.
         * The metadata will be associated with this block of data
         * for being used in the corresponding InputDStream.
         */
        def store(dataIterator: java.util.Iterator[T], metadata: Any) {
          supervisor.pushIterator(dataIterator.asScala, Some(metadata), None)
        }
      
        /** Store an iterator of received data as a data block into Spark's memory. */
        def store(dataIterator: java.util.Iterator[T]) {
          supervisor.pushIterator(dataIterator.asScala, None, None)
        }
      
        /**
         * Store an iterator of received data as a data block into Spark's memory.
         * The metadata will be associated with this block of data
         * for being used in the corresponding InputDStream.
         */
        def store(dataIterator: Iterator[T], metadata: Any) {
          supervisor.pushIterator(dataIterator, Some(metadata), None)
        }
      
        /**
         * Store the bytes of received data as a data block into Spark's memory. Note
         * that the data in the ByteBuffer must be serialized using the same serializer
         * that Spark is configured to use.
         */
        def store(bytes: ByteBuffer) {
          supervisor.pushBytes(bytes, None, None)
        }
      
        /**
         * Store the bytes of received data as a data block into Spark's memory.
         * The metadata will be associated with this block of data
         * for being used in the corresponding InputDStream.
         */
        def store(bytes: ByteBuffer, metadata: Any) {
          supervisor.pushBytes(bytes, Some(metadata), None)
        }
      
        /** Report exceptions in receiving data. */
        def reportError(message: String, throwable: Throwable) {
          supervisor.reportError(message, throwable)
        }
      
        /**
         * Restart the receiver. This method schedules the restart and returns
         * immediately. The stopping and subsequent starting of the receiver
         * (by calling `onStop()` and `onStart()`) is performed asynchronously
         * in a background thread. The delay between the stopping and the starting
         * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
         * The `message` will be reported to the driver.
         */
        def restart(message: String) {
          supervisor.restartReceiver(message)
        }
      
        /**
         * Restart the receiver. This method schedules the restart and returns
         * immediately. The stopping and subsequent starting of the receiver
         * (by calling `onStop()` and `onStart()`) is performed asynchronously
         * in a background thread. The delay between the stopping and the starting
         * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
         * The `message` and `exception` will be reported to the driver.
         */
        def restart(message: String, error: Throwable) {
          supervisor.restartReceiver(message, Some(error))
        }
      
        /**
         * Restart the receiver. This method schedules the restart and returns
         * immediately. The stopping and subsequent starting of the receiver
         * (by calling `onStop()` and `onStart()`) is performed asynchronously
         * in a background thread.
         */
        def restart(message: String, error: Throwable, millisecond: Int) {
          supervisor.restartReceiver(message, Some(error), millisecond)
        }
      
        /** Stop the receiver completely. */
        def stop(message: String) {
          supervisor.stop(message, None)
        }
      
        /** Stop the receiver completely due to an exception */
        def stop(message: String, error: Throwable) {
          supervisor.stop(message, Some(error))
        }
      
        /** Check if the receiver has started or not. */
        def isStarted(): Boolean = {
          supervisor.isReceiverStarted()
        }
      
        /**
         * Check if receiver has been marked for stopping. Use this to identify when
         * the receiving of data should be stopped.
         */
        def isStopped(): Boolean = {
          supervisor.isReceiverStopped()
        }
      
        /**
         * Get the unique identifier the receiver input stream that this
         * receiver is associated with.
         */
        def streamId: Int = id
      
        /*
         * =================
         * Private methods
         * =================
         */
      
        /** Identifier of the stream this receiver is associated with. */
        private var id: Int = -1
      
        /** Handler object that runs the receiver. This is instantiated lazily in the worker. */
        @transient private var _supervisor: ReceiverSupervisor = null
      
        /** Set the ID of the DStream that this receiver is associated with. */
        private[streaming] def setReceiverId(_id: Int) {
          id = _id
        }
      
        /** Attach Network Receiver executor to this receiver. */
        private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
          assert(_supervisor == null)
          _supervisor = exec
        }
      
        /** Get the attached supervisor. */
        private[streaming] def supervisor: ReceiverSupervisor = {
          assert(_supervisor != null,
            "A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
              "some computation in the receiver before the Receiver.onStart() has been called.")
          _supervisor
        }
    複製代碼

1.2 Receiver的繼承關係

2 ReceiverTracker 深度剖析

2.1 ReceiverTracker 的啓動流程

  • 創建通信終端,方便整個Streaming狀態監控。緩存

  • launchReceivers() :本身給本身 endpoint 終端發送StartAllReceivers,而後啓動Receiver RDD,把它做爲Job,而後由SparkContext提交Job,最後Receiver 就能夠在Executor上啓動。架構

    /** Start the endpoint and receiver execution thread. */
       def start(): Unit = synchronized {
         if (isTrackerStarted) {
           throw new SparkException("ReceiverTracker already started")
         }
     
         if (!receiverInputStreams.isEmpty) {
         
           endpoint = ssc.env.rpcEnv.setupEndpoint(
             "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
             
           if (!skipReceiverLaunch) launchReceivers()      <= 神來之筆
           
           logInfo("ReceiverTracker started")
           trackerState = Started
         }
       }
    複製代碼

2.2 ReceiverTracker.launchReceivers 是一個信使

做爲信使,我就是要發StartAllReceivers(receivers)消息。框架

  • Get the receivers from the ReceiverInputDStreams, distributes them to the
  • worker nodes as a parallel collection, and runs them. private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map { nis =>
  val rcvr = nis.getReceiver()
  rcvr.setReceiverId(nis.id)
  rcvr
}                                                          <= 神來之筆

runDummySparkJob()

logInfo("Starting " + receivers.length + " receivers")

endpoint.send(StartAllReceivers(receivers))                <= 神來之筆
複製代碼

}async

2.2 ReceiverTracker 接收本身的消息並處理

/** RpcEndpoint to receive messages from the receivers. */
      private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
    
        private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
          ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
    
        @volatile private var active: Boolean = true
    
        override def receive: PartialFunction[Any, Unit] = {
        
          // Local messages
          case StartAllReceivers(receivers) =>
          
            val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) 
               <= 神來之筆(以最優位置推薦,肯定receiver在哪一個Executor上啓動)
          
            
            for (receiver <- receivers) {
            
              val executors = scheduledLocations(receiver.streamId)          <= 神來之筆
              updateReceiverScheduledExecutors(receiver.streamId, executors)
              receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
              
              
              startReceiver(receiver, executors)        <= 神來之筆(循環全部receiver以最優位置啓動)
              
              
            }
          case RestartReceiver(receiver) =>
            // Old scheduled executors minus the ones that are not active any more
            val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
            val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
                // Try global scheduling again
                oldScheduledExecutors
              } else {
                val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
                // Clear "scheduledLocations" to indicate we are going to do local scheduling
                val newReceiverInfo = oldReceiverInfo.copy(
                  state = ReceiverState.INACTIVE, scheduledLocations = None)
                receiverTrackingInfos(receiver.streamId) = newReceiverInfo
                schedulingPolicy.rescheduleReceiver(
                  receiver.streamId,
                  receiver.preferredLocation,
                  receiverTrackingInfos,
                  getExecutors)
              }
            // Assume there is one receiver restarting at one time, so we don't need to update
            // receiverTrackingInfos
            startReceiver(receiver, scheduledLocations)
          case c: CleanupOldBlocks =>
            receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
          case UpdateReceiverRateLimit(streamUID, newRate) =>
            for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
              eP.send(UpdateRateLimit(newRate))
            }
          // Remote messages
          case ReportError(streamId, message, error) =>
            reportError(streamId, message, error)
        }
複製代碼

2.2 ReceiverTracker發射receiver到Executor

  • startReceiverFunc :receiverRDD的func 函數,主要執行在Executor中,startReceiverFunc方法體中包含須要啓動的 ReceiverSupervisorImpl ,ReceiverSupervisorImpl是具體的數據接收執行者。內部封裝了BlockGenerator:ide

    private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()
    複製代碼
  • BlockGenerator 中的 blockIntervalTimer 和 blockPushingThread 負責了整個實時數據流的batch到Block的過程。函數

  • startReceiver 核心代碼段,從receiverRDD到Job提交,最終executor端的ReceiverSupervisorImpl.start()oop

    * Start a receiver along with its scheduled executors
        private def startReceiver(
          receiver: Receiver[_],
          scheduledLocations: Seq[TaskLocation]): Unit = {
          
        def shouldStartReceiver: Boolean = {
          // It's okay to start when trackerState is Initialized or Started
          !(isTrackerStopping || isTrackerStopped)
        }
    
        val receiverId = receiver.streamId
        if (!shouldStartReceiver) {
          onReceiverJobFinish(receiverId)
          return
        }
    
        val checkpointDirOption = Option(ssc.checkpointDir)
        val serializableHadoopConf =
          new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
    
        // Function to start the receiver on the worker node
        val startReceiverFunc: Iterator[Receiver[_]] => Unit =
          (iterator: Iterator[Receiver[_]]) => {
            if (!iterator.hasNext) {
              throw new SparkException(
                "Could not start receiver as object not found.")
            }
            if (TaskContext.get().attemptNumber() == 0) {
              val receiver = iterator.next()
              assert(iterator.hasNext == false)
              val supervisor = new ReceiverSupervisorImpl(
                receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
              supervisor.start()
              supervisor.awaitTermination()
            } else {
              // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
            }
          }
    
    
        // Create the RDD using the scheduledLocations to run the receiver in a Spark job
        val receiverRDD: RDD[Receiver[_]] =
          if (scheduledLocations.isEmpty) {
            ssc.sc.makeRDD(Seq(receiver), 1)
          } else {
            val preferredLocations = scheduledLocations.map(_.toString).distinct
            ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
          }
        receiverRDD.setName(s"Receiver $receiverId")
        ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
        ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
    
        val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
          receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
        // We will keep restarting the receiver job until ReceiverTracker is stopped
        future.onComplete {
          case Success(_) =>
            if (!shouldStartReceiver) {
              onReceiverJobFinish(receiverId)
            } else {
              logInfo(s"Restarting Receiver $receiverId")
              self.send(RestartReceiver(receiver))
            }
          case Failure(e) =>
            if (!shouldStartReceiver) {
              onReceiverJobFinish(receiverId)
            } else {
              logError("Receiver has been stopped. Try to restart it.", e)
              logInfo(s"Restarting Receiver $receiverId")
              self.send(RestartReceiver(receiver))
            }
        }(ThreadUtils.sameThread)
        logInfo(s"Receiver ${receiver.streamId} started")
      }
    複製代碼
  • ReceiverTracker 內部 receiver RDD的架構圖

下圖深度剖析了ReceiverTracker中如何實現 receiver RDD 的Job提交流程

3 總結

最終Driver端的ReceiverTracker 負責把receiver在不一樣的Executor中啓動,從而使得receiver能夠不間斷的進行數據的批處理存儲及BlcokManger管理。

秦凱新 於深圳 2018

相關文章
相關標籤/搜索