經過案例對SparkStreaming透徹理解-3

本期內容:多線程

  1. 解密Spark Streaming Job架構和運行機制架構

  2. 解密Spark Streaming 容錯架構和運行機制框架

 

  一切不能進行實時流處理的數據都是無效的數據。在流處理時代,SparkStreaming有着強大吸引力,並且發展前景廣闊,加之Spark的生態系統,Streaming能夠方便調用其餘的諸如SQL,MLlib等強大框架,它必將一統天下。socket

  Spark Streaming運行時與其說是Spark Core上的一個流式處理框架,不如說是Spark Core上的一個最複雜的應用程序。若是能夠掌握Spark streaming這個複雜的應用程序,那麼其餘的再複雜的應用程序都不在話下了。這裏選擇Spark Streaming做爲版本定製的切入點也是大勢所趨。分佈式

    本節課經過從job和容錯的總體架構上來考察Spark Streaming的運行機制。ide

用以前已有的最簡單的例子:oop

// Socket來源的單詞計數
// YY課堂:天天20:00現場授課頻道68917580
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala")
val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
words.print()
ssc.start()

 

跟蹤源碼能夠發現:ui

在初始化 StreamingContext時,建立了以下幾個對象:this

// StreamingContext.scala line 183
private[streaming] val scheduler = new JobScheduler(this)

 

而JobScheduler在初始化的時候,會初始化jobGenerator,且包含receiverTracker。spa

// JobScheduler.scala line 50
private val jobGenerator = new JobGenerator(this) // line 50
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()

// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null // 56

 

再看建立DStream的部分

// StreamingContext.scala line 327
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

// StreamingContext.scala line 345
def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel) // line 351
}

 

 

// SocketInputDStream.scala line 33
private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  // 這個方法是關鍵
  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

 

再看 ssc.start

// StreamingContext.scala line 594
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()

          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
            scheduler.start() // line 610
          }
          state = StreamingContextState.ACTIVE
        } catch {
          case NonFatal(e) =>
            logError("Error starting the context, marking it as stopped", e)
            scheduler.stop(false)
            state = StreamingContextState.STOPPED
            throw e
        }
        StreamingContext.setActiveContext(this)
      }
      shutdownHookRef = ShutdownHookManager.addShutdownHook(
        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
      // Registering Streaming Metrics at the start of the StreamingContext
      assert(env.metricsSystem != null)
      env.metricsSystem.registerSource(streamingSource)
      uiTab.foreach(_.attach())
      logInfo("StreamingContext started")
    case ACTIVE =>
      logWarning("StreamingContext has already been started")
    case STOPPED =>
      throw new IllegalStateException("StreamingContext has already been stopped")
  }
}

 

第610行,調用了scheduler.start,scheduler就是以前初始化是產生的JobScheduler。

// JobScheduler.scala line 62
def start(): Unit = synchronized {
  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates
  for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)
  receiverTracker = new ReceiverTracker(ssc) // line 80
  inputInfoTracker = new InputInfoTracker(ssc)
  receiverTracker.start()
  jobGenerator.start()
  logInfo("Started JobScheduler")
}

 

請看80行,將receiverTracker初始化:

// ReceiverTracker.scala line 101
private[streaming]
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
  private val receiverInputStreamIds = receiverInputStreams.map { _.id }
  private val receivedBlockTracker = new ReceivedBlockTracker(
    ssc.sparkContext.conf,
    ssc.sparkContext.hadoopConfiguration,
    receiverInputStreamIds,
    ssc.scheduler.clock,
    ssc.isCheckpointPresent,
    Option(ssc.checkpointDir)
  )

 

調用receiverTracker.start和jobGenerator.star

// ReceiverTracker.scala line 148
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
  if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
  }

  if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint(
      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers() // line 157
    logInfo("ReceiverTracker started")
    trackerState = Started
  }
}

 

launchReceivers()

// ReceiverTracker.scala line 413
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver() // 這個就是SocketInputDStream.getReceiver(),本例中是SocketReceiver ,見SocketInputDStream.scala line 34
    rcvr.setReceiverId(nis.id)
    rcvr
  })

  runDummySparkJob()

  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers)) // line 423
}

 

看看StartAllReceivers是如何被消費的?

// ReceiverTracker.scala line 448
// Local messages
case StartAllReceivers(receivers) =>
  val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // 儘可能負載均勻
  for (receiver <- receivers) {
    val executors = scheduledLocations(receiver.streamId)
    updateReceiverScheduledExecutors(receiver.streamId, executors)
    receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
    startReceiver(receiver, executors) // 啓動接收器,再也不進一步深究,有興趣的能夠繼續查看源碼
  }

 

再回到JobScheduler.scala line 83,jobGenerator.start

// JobGenerator.scala line 79
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter

  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
  eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    startFirstTime()
  }
}

 

至此消息接收和Job生成器已啓動。

 

在StreamingContext調用start方法的內部實際上是會啓動JobScheduler的Start方法,進行消息循環,在JobScheduler的start內部會構造JobGenerator和ReceiverTacker,而且調用JobGenerator和ReceiverTacker的start方法

 

  1.JobGenerator啓動後會不斷的根據batchDuration生成一個個的Job

 

  2.ReceiverTracker啓動後首先在Spark Cluster中啓動Receiver(實際上是在Executor中先啓動ReceiverSupervisor),在Receiver收到數據後會經過ReceiverSupervisor存儲到Executor而且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker內部會經過ReceivedBlockTracker來管理接受到的元數據信息

 

  每一個BatchInterval會產生一個具體的Job,其實這裏的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD的DAG而已,從Java角度講,至關於Runnable接口實例,此時要想運行Job須要提交給JobScheduler,在JobScheduler中經過線程池的方式找到一個單獨的線程來提交Job到集羣運行(實際上是在線程中基於RDD的Action觸發真正的做業的運行)。

 

  爲何使用線程池呢?

 

   1.做業不斷生成,因此爲了提高效率,咱們須要線程池;這和在Executor中經過線程池執行Task有殊途同歸之妙;

 

   2.有可能設置了Job的FAIR公平調度的方式,這個時候也須要多線程的支持。

 

  第二部分:從容錯架構的角度透視Spark Streaming

 

  咱們知道DStream與RDD的關係就是隨着時間流逝不斷的產生RDD,對DStream的操做就是在固定時間上操做RDD。因此從某種意義上而言,Spark Streaming的基於DStream的容錯機制,實際上就是劃分到每一次造成的RDD的容錯機制,這也是Spark Streaming的高明之處。

 

  RDD做爲 分佈式彈性數據集,它的彈性主要體如今:

 

  1.自動的分配內存和硬盤,優先基於內存

 

  2.基於lineage容錯機制

 

  3.task會指定次數的重試

 

  4.stage失敗會自動重試

 

  5.checkpoint和persist 複用

 

  6.數據調度彈性:DAG,TASK和資源管理無關。

 

  7.數據分片的高度彈性

 

  基於RDD的特性,它的容錯機制主要就是兩種:一是checkpoint,二是基於lineage(血統)的容錯。通常而言,spark選擇血統容錯,由於對於大規模的數據集,作檢查點的成本很高。可是有的狀況下,不如說lineage鏈條過於複雜和冗長,這時候就須要作checkpoint。

 

  考慮到RDD的依賴關係,每一個stage內部都是窄依賴,此時通常基於lineage容錯,方便高效。在stage之間,是寬依賴,產生了shuffle操做,這種狀況下,作檢查點則更好。總結來講,stage內部作lineage,stage之間作checkpoint。

後續的會有什麼更深的內幕?且聽下回分解。

相關文章
相關標籤/搜索