經過案例對SparkStreaming 透徹理解三板斧之一:解密SparkStreaming另類實驗

Spark中程序最容易出錯的是流處理,流處理也是目前spark技術瓶頸之一,因此要作出一個優秀的spark發行版的話,對流處理的優化是必需的。html

根據spark歷史演進的趨勢,spark graphX,機器學習已經發展得很是好。對它進行改進是重要的,單不是最重要的。最最重要的仍是流處理,而流處理最爲核心的是流處理結合機器學習,圖計算的一體化結合使用,真正的實現一個堆棧rum them all .node

1 流處理最容易出錯apache

2 流處理結合圖計算和機器學習將發揮出巨大的潛力api

3 構造出複雜的實時數據處理的應用程序網絡

流處理實際上是構建在spark core之上的一個應用程序app

一:代碼案例:機器學習

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by hadoop on 2016/4/18.
  * 背景描述 在廣告點擊計費系統中 咱們在線過濾掉 黑名單的點擊 進而保護廣告商的利益
  * 只有效的廣告點擊計費
  *新浪微博:http://www.weibo.com/ilovepains
  */
object OnlineBlanckListFilter extends App{
  //val basePath = "hdfs://master:9000/streaming"
  val conf = new SparkConf().setAppName("SparkStreamingOnHDFS")
  if(args.length == 0) conf.setMaster("spark://Master:7077")
  val ssc = new StreamingContext(conf, Seconds(30))
  val blackList = Array(("hadoop", true) , ("mahout", true), ("spark", false))
  val backListRDD = ssc.sparkContext.parallelize(blackList)
  val adsClickStream = ssc.socketTextStream("192.168.74.132", 9000, StorageLevel.MEMORY_AND_DISK_SER_2)

  val rdd = adsClickStream.map{ads => (ads.split(" ")(1), ads)}
  val validClicked = rdd.transform(userClickRDD => {
    val joinedBlackRDD = userClickRDD.leftOuterJoin(backListRDD)
    joinedBlackRDD.filter(joinedItem => {
      if(joinedItem._2._2.getOrElse(false)){
        false
      }else{
        true
      }
    })
  })

  validClicked.map(validClicked => {
    validClicked._2._1
  }).print()

  ssc.start()
  ssc.awaitTermination()
}

  二:sparkStreamUI 觀察任務狀態:socket

  

    思考:這裏一共有5個JOB,第 2 3 4 是咱們在代碼中觸發的JOB那麼 第0和第1個JOB從何而來?ide

    咱們查看JOB0 的UI:oop

  

  咱們發現這個任務是咱們的應用程序啓動後就有了,思考:這個JOB是幹什麼的?

  咱們從源碼出發

  

/**類名:ReceiverTracker
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })

  runDummySparkJob()

  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers))
}

   咱們關注

runDummySparkJob

  這個方法:

/**
 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
 * receivers to be scheduled on the same node.
 *
 * TODO Should poll the executor number and wait for executors according to
 * "spark.scheduler.minRegisteredResourcesRatio" and
 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
 */
private def runDummySparkJob(): Unit = {
  if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  }
  assert(getExecutors.nonEmpty)
}

  這個註釋清楚地說明了這個任務的做用,爲了最大化第利用集羣資源,避免數據接收都在一個節點上

  如今咱們繼續關注JOB1 這個任務:

  

  咱們繼續查看這個任務的詳情:

  

     筆者這次數據的來源是用 nc -lk 9000(在worker2 這個節點上運行的)

     咱們懷疑這個就是數據的接收點?

     源碼中找解釋:

     

/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
    receiver: Receiver[_],
    scheduledLocations: Seq[TaskLocation]): Unit = {
  def shouldStartReceiver: Boolean = {
    // It's okay to start when trackerState is Initialized or Started
    !(isTrackerStopping || isTrackerStopped)
  }

  val receiverId = receiver.streamId
  if (!shouldStartReceiver) {
    onReceiverJobFinish(receiverId)
    return
  }

  val checkpointDirOption = Option(ssc.checkpointDir)
  val serializableHadoopConf =
    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

  // Function to start the receiver on the worker node
  val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator: Iterator[Receiver[_]]) => {
      if (!iterator.hasNext) {
        throw new SparkException(
          "Could not start receiver as object not found.")
      }
      if (TaskContext.get().attemptNumber() == 0) {
        val receiver = iterator.next()
        assert(iterator.hasNext == false)
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      } else {
        // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
      }
    }

  // Create the RDD using the scheduledLocations to run the receiver in a Spark job
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
      ssc.sc.makeRDD(Seq(receiver), 1)
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
  // We will keep restarting the receiver job until ReceiverTracker is stopped
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logError("Receiver has been stopped. Try to restart it.", e)
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
  logInfo(s"Receiver ${receiver.streamId} started")
}

   咱們看到源碼中有這樣一句話:Create the RDD using the scheduledLocations to run the receiver in a Spark job

   說明數據的接收者是以任務的方式運行在Worker節點上,這說明了SparkStreaming能夠極大話地利用集羣資源,各個節點    均可以接收數據,數據產生以後會放在BlockManager 裏邊(後續源碼繼續分析)。將數據存儲起來。

   思考:數據被收集起來,那麼咱們真正的計算髮生在哪裏?先查看下WEBUI:

   

    這裏咱們看到了咱們的代碼邏輯。任務仍是經過JobScheduler 這個類經過線程池的方式提交給集羣運行的。

  附上JOB提交源碼:

def run() {
  try {
    val formattedTime = UIUtils.formatBatchTime(
      job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
    val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
    val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

    ssc.sc.setJobDescription(
      s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
    ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
    ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

    // We need to assign `eventLoop` to a temp variable. Otherwise, because
    // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
    // it's possible that when `post` is called, `eventLoop` happens to null.
    var _eventLoop = eventLoop
    if (_eventLoop != null) {
      _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
      // Disable checks for existing output directories in jobs launched by the streaming
      // scheduler, since we may need to write output to an existing directory during checkpoint
      // recovery; see SPARK-4835 for more details.
      PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
        job.run()
      }
      _eventLoop = eventLoop
      if (_eventLoop != null) {
        _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
      }
    } else {
      // JobScheduler has been stopped.
    }
  } finally {
    ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
    ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
  }
}

  經過跟蹤源碼發現 job.run 方法指向了外部傳入的一個方法:

/**
 * Generate a SparkStreaming job for the given time. This is an internal method that
 * should not be called directly. This default implementation creates a job
 * that materializes the corresponding RDD. Subclasses of DStream may override this
 * to generate their own jobs.
 */
private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) => {
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    }
    case None => None
  }
}

   也就是說是經過Dstream的generateJob方法來向集羣提交任務的(DStreamGraph調用generateJobs觸發了Dstream類      的generateJob這個方法)DStreamGraph記錄了Dstream的邏輯轉關係,最終將Dstream上的轉換關係回溯生成RDD實      例,構 成了RDD的DAG,觸發條件就是一個咱們自定義的一個Batch,這裏調用了getOrCompute 方法來回溯Dstream的    轉換, 生 成了RDD實例和RDD的DAG。

  附上一個sparkStreaming 總體流程圖

   

   從這裏咱們能得出什麼結論?

   1.數據的收集發生在spark集羣中的Worker節點,數據接收器(Receiver),是以一個JOB來接收數據的!

   2.真正的計算也發生在Worker上,spark集羣把任務分發到各個節點,及數據的接收在一個節點,而數據的收集在一個節點   (針對這次實踐)

   3.sparkStreaming 中各個任務是配合起來工做的,至於爲什麼要這樣作後續繼續分析

   4.數據本地性,上邊咱們看到任務的本地性是 PROCESS_LOCAL 這個說明了數據是在內存中,而咱們數據是在一個節上,那這裏的數據必然要通過網絡傳輸(須要通過Shuffle講數據放在計算的節點),每次數據收集的時候會將數據分片,並    且將數據分發到各個計算節點上。

  5.sparkStreaming是以時間爲單位來生成JOB,本質上來說是加上了時間維度的批處理任務

 三: 瞬間理解Spark Streaming本質

 

   DStream是一個沒有邊界的集合,沒有大小的限制。

    DStream表明了時空的概念。隨着時間的推移,裏面不斷產生RDD。

    時間已固定,咱們就鎖定到空間的操做。

    從空間的維度來說,就是處理層面。

    對DStream的操做,構成了DStreamGraph。如如下圖例所示:

   

上圖中每一個foreach都會觸發一個做業,就會順着依賴從後往前回溯,造成DAG,以下圖所示:

空間維度肯定以後,隨着時間不斷推動,會不斷實例化RDD Graph,而後觸發Job去執行處,及上面所說的(generateJobs)這個方法。

再次理解官網的一段話:

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.

相關文章
相關標籤/搜索