SparkStreaming數據流從currentBuffer到Block定時轉化過程源碼深度剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

1 ReceiverTracker 以一發牽動全身

  • 下圖深度剖析了ReceiverTracker中如何實現 receiver RDD 的Job提交流程,右側黃色底面爲本節重點要講的ReceiverSupervisorImpl。實現了receiver的啓動,以及Block的生成等過程。緩存

  • 下圖能夠清晰的看到supervisor做爲父類,在StartReceiverFunc,啓動了兩個start函數:架構

    /** Start the supervisor */
        def start() {
          onStart()
          startReceiver()
        }
    複製代碼

    (1)第一個啓動了 ReceiverSupervisorImpl的 onStart()方法,從而啓動了registeredBlockGenerators,開啓了數據batch的生成和管理。框架

    override protected def onStart() {
          registeredBlockGenerators.asScala.foreach { _.start() }
        }
    複製代碼

    (2)第二個 startReceiver,先調用startReceiver,進一步會調用ReceiverSupervisorImpl的onReceiverStart方法來判斷是否成功註冊到ReceiverTracker中,若成功則會啓動receiversocket

    supervisor的startReceiver方法
    
    def startReceiver(): Unit = synchronized {
      try {
        if (onReceiverStart()) {           <=神來之筆(端點通信註冊Receiver)
        
          logInfo(s"Starting receiver $streamId")
          receiverState = Started
          
          receiver.onStart()                <=神來之筆
          
          logInfo(s"Called receiver $streamId onStart")
        } else {
          // The driver refused us
          stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
        }
      } catch {
        case NonFatal(t) =>
          stop("Error starting receiver " + streamId, Some(t))
      }
    }
    
    
      ReceiverSupervisorImpl的onReceiverStart方法
       
      override protected def onReceiverStart(): Boolean = {
          val msg = RegisterReceiver(
            streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
          trackerEndpoint.askSync[Boolean](msg)
        }
    複製代碼

    (3)ReceiverTracker的receiver註冊請求管理ide

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      
        // Remote messages
        case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
          val successful =
            registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)                 <=神來之筆 (eceiverTrackingInfos的管理) 
            
          context.reply(successful)
          
        case AddBlock(receivedBlockInfo) =>
          if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
            walBatchingThreadPool.execute(new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                if (active) {
                  context.reply(addBlock(receivedBlockInfo))
                } else {
                  throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
                }
              }
            })
          } else {
            context.reply(addBlock(receivedBlockInfo))
          }
          
        case DeregisterReceiver(streamId, message, error) =>
          deregisterReceiver(streamId, message, error)
          context.reply(true)
    
        // Local messages
        case AllReceiverIds =>
          context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
          
        case GetAllReceiverInfo =>
          context.reply(receiverTrackingInfos.toMap)
          
        case StopAllReceivers =>
          assert(isTrackerStopping || isTrackerStopped)
          stopReceivers()
          context.reply(true)
      }
    複製代碼

(4)registerReceiver中如何實現receiverTrackingInfos的管理函數

/** Register a receiver */
      private def registerReceiver(
          streamId: Int,
          typ: String,
          host: String,
          executorId: String,
          receiverEndpoint: RpcEndpointRef,
          senderAddress: RpcAddress
        ): Boolean = {
        if (!receiverInputStreamIds.contains(streamId)) {
          throw new SparkException("Register received for unexpected id " + streamId)
        }
    
        if (isTrackerStopping || isTrackerStopped) {
          return false
        }
    
        val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
        val acceptableExecutors = if (scheduledLocations.nonEmpty) {
            // This receiver is registering and it's scheduled by
            // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
            scheduledLocations.get
          } else {
            // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
            // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
            scheduleReceiver(streamId)
          }
    
        def isAcceptable: Boolean = acceptableExecutors.exists {
          case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
          case loc: TaskLocation => loc.host == host
        }
    
        if (!isAcceptable) {
          // Refuse it since it's scheduled to a wrong executor
          false
        } else {
          val name = s"${typ}-${streamId}"
          val receiverTrackingInfo = ReceiverTrackingInfo(
            streamId,
            ReceiverState.ACTIVE,
            scheduledLocations = None,
            runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
            name = Some(name),
            endpoint = Some(receiverEndpoint))
            
          receiverTrackingInfos.put(streamId, receiverTrackingInfo)               <=神來之筆
          
          listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
          logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
          true
        }
      }
複製代碼
  • ReceiverSupervisorImpl整體架構圖以下:

2 BlockGenerator的深度剖析

2.1 SocketInputDStream 數據流的接收存儲過程

  • 依賴於ReceiverSupervisoroop

    /** Create a socket connection and receive data until receiver is stopped */
       def receive() {
         try {
           val iterator = bytesToObjects(socket.getInputStream())
           while(!isStopped && iterator.hasNext) {
           
             store(iterator.next())                  <= 神來之筆
             
           }
           if (!isStopped()) {
             restart("Socket data stream had no more data")
           } else {
             logInfo("Stopped receiving")
           }
         } catch {
           case NonFatal(e) =>
             logWarning("Error receiving data", e)
             restart("Error receiving data", e)
         } finally {
           onStop()
         }
       }
    複製代碼
  • 依賴於ReceiverSupervisor的pushSingle方法post

    * Store a single item of received data to Spark's memory.
     * These single items will be aggregated together into data blocks before
     * being pushed into Spark's memory.
    
     def store(dataItem: T) {
       supervisor.pushSingle(dataItem)             <= 神來之筆
     }
    複製代碼
  • 依賴於ReceiverSupervisor的內部的defaultBlockGenerator學習

    /* Push a single record of received data into block generator. 
    def pushSingle(data: Any) {
      defaultBlockGenerator.addData(data)           <= 神來之筆
    }
    複製代碼

2.2 BlockGenerator的重劍無鋒

  • 兄弟1:blockIntervalTimer
  • 兄弟2:blockPushingThread

BlockGenerator的仗劍走天涯,詩酒趁年華。兩大線程解決block存儲和管理問題:

private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")

一大線程:
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

二大線程:
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
複製代碼
  • 倆兄弟上吧

    def start(): Unit = synchronized {
         if (state == Initialized) {
           state = Active
    
           blockIntervalTimer.start()            <= 神來之筆
           blockPushingThread.start()            <= 神來之筆
           
           logInfo("Started BlockGenerator")
         } else {
           throw new SparkException(
             s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
         }
       }    
    複製代碼

2.3 BlockGenerator的厚積薄發

  • 積水緩衝缸(存儲離散的數據流)(currentBuffer)

    @volatile private var currentBuffer = new ArrayBuffer[Any]

  • 桶裝水(積水緩衝缸的水彙集成桶)(blocksForPushing)

    private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
    複製代碼
  • 積水緩衝缸經過InputDStream蓄水

    * Push a single data item into the buffer.
    def addData(data: Any): Unit = {
      if (state == Active) {
        waitToPush()
        synchronized {
          if (state == Active) {
            currentBuffer += data
          } else {
            throw new SparkException(
              "Cannot add data as BlockGenerator has not been started or has been stopped")
          }
        }
      } else {
        throw new SparkException(
          "Cannot add data as BlockGenerator has not been started or has been stopped")
      }
    }
    複製代碼
  • blockIntervalTimer把積水緩衝缸轉換爲桶裝水,管理起來

    /** Change the buffer to which single records are added to. */
        private def updateCurrentBuffer(time: Long): Unit = {
          try {
            var newBlock: Block = null
            synchronized {
              if (currentBuffer.nonEmpty) {
                val newBlockBuffer = currentBuffer
                currentBuffer = new ArrayBuffer[Any]
                
                val blockId = StreamBlockId(receiverId, time - blockIntervalMs) <= 神來之筆
                
                listener.onGenerateBlock(blockId)
                
                newBlock = new Block(blockId, newBlockBuffer)         <= 神來之筆   
                
              }
            }
      
            if (newBlock != null) {
            
              blocksForPushing.put(newBlock)  // put is blocking when queue is full    <= 神來之筆
            }
          } catch {
            case ie: InterruptedException =>
              logInfo("Block updating timer thread was interrupted")
            case e: Exception =>
              reportError("Error in block updating thread", e)
          }
        }
    複製代碼
  • keepPushingBlocks 看我攪動風雲

2.3 BlockGenerator內部keepPushingBlocks的攪動風雲

  • blockPushingThread線程調用defaultBlockGeneratorListener,經過blockManager.putBytes來存儲Block,同時告訴Driver端ReceiverTracker的 AddBlock(blockInfo)信息添加成功。
  • ReceiverTracker經過上面的端點通訊掌握了Block的存儲元數據信息。
  • 根據是否開啓預讀寫日誌,出現WriteAheadLogBasedBlockHandler和BlockManagerBasedBlockHandler,參數能夠經過conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)來配置。
  • 一切的一切都是ReceiverSupervisorImpl來主導的,由於ReceiverSupervisorImpl內部成員包含了registeredBlockGenerators,defaultBlockGeneratorListener,trackerEndpoint等等全部給力干將。因此從receiver啓動到block生成,到block的管理,再到上報給ReceiverTracker,真正實現了覆蓋一切的角色

3 總結

咱們發現看了一場ReceiverSupervisorImpl的世紀大戲,自導自演解決了端到端的問題。

秦凱新 於深圳 2018

相關文章
相關標籤/搜索