SparkStreaming

Spark Streaming的內核機制。後臺線程負責接收源數據的流,收到的數據保存到BlockManager,並按照時間間隔封裝成批量RDD,在RDD上執行各類操做。數組

每種數據源提供本身的Receiver的實現,將接收的數據調用store方法保存下來,store方法調用ReceiverSupervisorImpl來保存真正的數據,ReceiverSupervisorImpl調用pushSingle等方法將數據添加到BlockGenerator。BlockGenerator啓動RecurringTimer定時器保證按照程序設置的時間間隔觸發spark streaming的任務執行,從BlockGenerator存儲中取出已經接收到的全部數據並構造RDD數組。同時在這一系列過程當中經過BlockGeneratorListener傳遞消息。spa

每種子實時數據源須要實現本身的DStream的子類。線程

    1. BlockGenerator

在reveiver/ BlockGenerator.scala中。負責接收DStream的數據並保存到blocksForPushing中。對於每種須要接收的實時流,都會啓動一個BlockGenerator負責在後臺默默的接收數據並保存到BlockManager中,同時產生Block並添加到blocksForPushing隊列中。scala

BlockGernerator的默認的時間間隔是200ms,能夠設置。定時器的代碼以下:隊列

/** Change the buffer to which single records are added to. */it

入參timeperiod的整數倍,time-period就是實時流RDD批次的序號。spark

  private def updateCurrentBuffer(time: Long): Unit = {io

    try {thread

      var newBlock: Block = null後臺

      synchronized {

        if (currentBuffer.nonEmpty) {

          val newBlockBuffer = currentBuffer

          currentBuffer = new ArrayBuffer[Any]

          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

          listener.onGenerateBlock(blockId)

          newBlock = new Block(blockId, newBlockBuffer)

        }

      }

 

      if (newBlock != null) {

        blocksForPushing.put(newBlock// put is blocking when queue is full

      }

    } catch {

      case ie: InterruptedException =>

        logInfo("Block updating timer thread was interrupted")

      case e: Exception =>

        reportError("Error in block updating thread", e)

    }

  }

    1. ReceiverInputDStream

不少Receiver的基類,利用compute方法生成RDD[T],從receiverTracker中根據validTime獲取當前batch時間的數據,通常存儲在BlockManager中,包裝成BlockRDD[T]。

相關文章
相關標籤/搜索