先貼下案例源碼apache
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext} object StreamingWordCountSelfScala { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次數據 val lines = ssc.socketTextStream("localhost", 9999) // 監聽 本地9999 socket 端口 val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 後 reduce words.print() // 打印結果 ssc.start() // 啓動 ssc.awaitTermination() ssc.stop(true) } }
上文已經從源碼分析到將Receiver做爲RDD提交給Spark,高層調度器調度此JobSubmitted事件後,發送LaunchTask消息給Executor。本文主要聚焦executor上運行Receiversocket
瞭解Spark Core的讀者都知道,Executor是運行在Worker所在的節點的。具體的進程名爲CoarseGrainedExecutorBackend。固然,這是在Standalone集羣模式下。ide
看下CoarseGrainedSchedulerBackend中,給Executor發送事件函數
// CoarseGrainedSchedulerBackend.scala line 244 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
CoarseGrainedExecutorBackend中的 receive方法。oop
// CoarseGrainedExecutorBackend.scala line 89 case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
launchTask,源碼分析
實例化TaskRunner,此TaskRunner實現了Runnableui
提交給線程池執行this
// Executor.scala line 118 def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
TaskRunner,繼承自Runnable。spa
run().net
反序列化Task;line 193
執行Task;line 213
// Executor.scala line 152 class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, val attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) extends Runnable{ // ... 一些成員變量初始化 // line 180 override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 startGCTime = computeTotalGcTime() try { val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // line 193 task.setTaskMemoryManager(taskMemoryManager) // ... 一些代碼 logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() var threwException = true val (value, accumUpdates) = try { val res = task.run( // line 213 taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } finally { val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) { throw new SparkException(errMsg) } else { logError(errMsg) } } } val taskFinish = System.currentTimeMillis() // ... 一些代碼 val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() // ... 一些代碼 val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { // ... 一些代碼 } finally { runningTasks.remove(taskId) } } }
Task.run中 runTask。此處是ResultTask.scala
// Task.scala line 67 final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem) : (T, AccumulatorUpdates) = { context = new TaskContextImpl( stageId, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, metricsSystem, internalAccumulators, runningLocally = false) TaskContext.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } try { (runTask(context), context.collectAccumulators()) // line 89 } finally { context.markTaskCompleted() try { Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask() // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may // not be strictly necessary, we should revisit whether we can remove this in the future. val memoryManager = SparkEnv.get.memoryManager memoryManager.synchronized { memoryManager.notifyAll() } } } finally { TaskContext.unset() } } }
ResultTask.runTask
反序列化出方法func和RDD;line 61;
執行此方法;line 66;而此時的方法就是SparkContext.scala line 1992:(context: TaskContext, iter: Iterator[T]) => cleanF(iter)
傳入 TaskContext和一個Iterator[Receiver]參數,調用cleanF(iter)。
// ResultTask.scala line 57 override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) }
而cleanF(iter)就是 ReceiverTracker.scala line 564中的startReceiverFunc
再回顧下startReceiverFunc的定義。此方法就是一個包含了Receiver類型的迭代器,返回Unit的函數。
// ReceiverTracker.scala line 564 val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } }
此時,會循環獲取此Receiver,實例化ReceiverSupervisorImpl。而後start。
至此,Receiver才真正的在Worker的Executor上執行。
這纔是流處理的第一步,接受數據器已經啓動。
下節將從源碼分析數據是如何接收的。