基於案例貫通Spark Streaming流計算框架運行源碼8

先貼下案例源碼apache

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}

object StreamingWordCountSelfScala {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次數據
    val lines = ssc.socketTextStream("localhost", 9999) // 監聽 本地9999 socket 端口
    val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 後 reduce
    words.print() // 打印結果
    ssc.start() // 啓動
    ssc.awaitTermination()
    ssc.stop(true)
  }
}

 

上文已經從源碼分析到將Receiver做爲RDD提交給Spark,高層調度器調度此JobSubmitted事件後,發送LaunchTask消息給Executor。本文主要聚焦executor上運行Receiversocket

瞭解Spark Core的讀者都知道,Executor是運行在Worker所在的節點的。具體的進程名爲CoarseGrainedExecutorBackend。固然,這是在Standalone集羣模式下。ide

看下CoarseGrainedSchedulerBackend中,給Executor發送事件函數

// CoarseGrainedSchedulerBackend.scala line 244
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

 

CoarseGrainedExecutorBackend中的 receive方法。oop

// CoarseGrainedExecutorBackend.scala line 89
case LaunchTask(data) =>
  if (executor == null) {
    logError("Received LaunchTask command but executor was null")
    System.exit(1)
  } else {
    val taskDesc = ser.deserialize[TaskDescription](data.value)
    logInfo("Got assigned task " + taskDesc.taskId)
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
      taskDesc.name, taskDesc.serializedTask)
  }

 

launchTask,源碼分析

  1. 實例化TaskRunner,此TaskRunner實現了Runnableui

  2. 提交給線程池執行this

// Executor.scala line 118
def launchTask(
    context: ExecutorBackend,
    taskId: Long,
    attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer): Unit = {
  val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
    serializedTask)
  runningTasks.put(taskId, tr)
  threadPool.execute(tr)
}

 

TaskRunner,繼承自Runnable。spa

run().net

  1. 反序列化Task;line 193

  2. 執行Task;line 213

// Executor.scala line 152
class TaskRunner(
    execBackend: ExecutorBackend,
    val taskId: Long,
    val attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer)
  extends Runnable{
 // ... 一些成員變量初始化 
 
// line 180
override def run(): Unit = {
  val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
  val deserializeStartTime = System.currentTimeMillis()
  Thread.currentThread.setContextClassLoader(replClassLoader)
  val ser = env.closureSerializer.newInstance()
  logInfo(s"Running $taskName (TID $taskId)")
  execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
  var taskStart: Long = 0
  startGCTime = computeTotalGcTime()

  try {
    val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) 
    updateDependencies(taskFiles, taskJars)
    task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)   // line 193
    task.setTaskMemoryManager(taskMemoryManager)
    // ... 一些代碼

    logDebug("Task " + taskId + "'s epoch is " + task.epoch)
    env.mapOutputTracker.updateEpoch(task.epoch)

    // Run the actual task and measure its runtime.
    taskStart = System.currentTimeMillis()
    var threwException = true
    val (value, accumUpdates) = try {
      val res = task.run(                            // line 213
        taskAttemptId = taskId,
        attemptNumber = attemptNumber,
        metricsSystem = env.metricsSystem)
      threwException = false
      res
    } finally {
      val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
      if (freedMemory > 0) {
        val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
        if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
          throw new SparkException(errMsg)
        } else {
          logError(errMsg)
        }
      }
    }
    val taskFinish = System.currentTimeMillis()

    // ... 一些代碼

    val resultSer = env.serializer.newInstance()
    val beforeSerialization = System.currentTimeMillis()
    val valueBytes = resultSer.serialize(value)
    val afterSerialization = System.currentTimeMillis()

    // ... 一些代碼

    val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
    val serializedDirectResult = ser.serialize(directResult)
    val resultSize = serializedDirectResult.limit

    // directSend = sending directly back to the driver
    val serializedResult: ByteBuffer = {
      if (maxResultSize > 0 && resultSize > maxResultSize) {
        logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
          s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
          s"dropping it.")
        ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
      } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
        val blockId = TaskResultBlockId(taskId)
        env.blockManager.putBytes(
          blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
        logInfo(
          s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
        ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
      } else {
        logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
        serializedDirectResult
      }
    }

    execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

  } catch {
    // ... 一些代碼
  } finally {
    runningTasks.remove(taskId)
  }
}
}

 

Task.run中  runTask。此處是ResultTask.scala

// Task.scala line 67
final def run(
  taskAttemptId: Long,
  attemptNumber: Int,
  metricsSystem: MetricsSystem)
: (T, AccumulatorUpdates) = {
  context = new TaskContextImpl(
    stageId,
    partitionId,
    taskAttemptId,
    attemptNumber,
    taskMemoryManager,
    metricsSystem,
    internalAccumulators,
    runningLocally = false)
  TaskContext.setTaskContext(context)
  context.taskMetrics.setHostname(Utils.localHostName())
  context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
  taskThread = Thread.currentThread()
  if (_killed) {
    kill(interruptThread = false)
  }
  try {
    (runTask(context), context.collectAccumulators())    // line 89
  } finally {
    context.markTaskCompleted()
    try {
      Utils.tryLogNonFatalError {
        // Release memory used by this thread for unrolling blocks
        SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
        // Notify any tasks waiting for execution memory to be freed to wake up and try to
        // acquire memory again. This makes impossible the scenario where a task sleeps forever
        // because there are no other tasks left to notify it. Since this is safe to do but may
        // not be strictly necessary, we should revisit whether we can remove this in the future.
        val memoryManager = SparkEnv.get.memoryManager
        memoryManager.synchronized { memoryManager.notifyAll() }
      }
    } finally {
      TaskContext.unset()
    }
  }
}

 

ResultTask.runTask

  1. 反序列化出方法func和RDD;line 61;

  2. 執行此方法;line 66;而此時的方法就是SparkContext.scala line 1992:(context: TaskContext, iter: Iterator[T]) => cleanF(iter)

傳入 TaskContext和一個Iterator[Receiver]參數,調用cleanF(iter)。

// ResultTask.scala line 57
override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val deserializeStartTime = System.currentTimeMillis()
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

  metrics = Some(context.taskMetrics)
  func(context, rdd.iterator(partition, context))
}

 

而cleanF(iter)就是 ReceiverTracker.scala line 564中的startReceiverFunc

再回顧下startReceiverFunc的定義。此方法就是一個包含了Receiver類型的迭代器,返回Unit的函數。

// ReceiverTracker.scala line 564
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
  (iterator: Iterator[Receiver[_]]) => {
    if (!iterator.hasNext) {
      throw new SparkException(
        "Could not start receiver as object not found.")
    }
    if (TaskContext.get().attemptNumber() == 0) {
      val receiver = iterator.next()
      assert(iterator.hasNext == false)
      val supervisor = new ReceiverSupervisorImpl(
        receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
      supervisor.start()
      supervisor.awaitTermination()
    } else {
      // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
    }
  }

 

此時,會循環獲取此Receiver,實例化ReceiverSupervisorImpl。而後start。

至此,Receiver才真正的在Worker的Executor上執行。

這纔是流處理的第一步,接受數據器已經啓動。

 

下節將從源碼分析數據是如何接收的。

相關文章
相關標籤/搜索