Apache Spark源碼走讀之3 -- Task運行期之函數調用關係分析

歡迎轉載,轉載請註明出處,徽滬一郎。html

概要

本篇主要闡述在TaskRunner中執行的task其業務邏輯是如何被調用到的,另外試圖講清楚運行着的task其輸入的數據從哪獲取,處理的結果返回到哪裏,如何返回。shell

準備

  1. spark已經安裝完畢
  2. spark運行在local mode或local-cluster mode

local-cluster mode

local-cluster模式也稱爲僞分佈式,可使用以下指令運行apache

MASTER=local[1,2,1024] bin/spark-shell

[1,2,1024] 分別表示,executor number, core number和內存大小,其中內存大小不該小於默認的512Mapi

Driver Programme的初始化過程分析

初始化過程的涉及的主要源文件

  1. SparkContext.scala       整個初始化過程的入口
  2. SparkEnv.scala          建立BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager
  3. DAGScheduler.scala       任務提交的入口,即將Job劃分紅各個stage的關鍵
  4. TaskSchedulerImpl.scala 決定每一個stage能夠運行幾個task,每一個task分別在哪一個executor上運行
  5. SchedulerBackend
    1. 最簡單的單機運行模式的話,看LocalBackend.scala
    2. 若是是集羣模式,看源文件SparkDeploySchedulerBackend

初始化過程步驟詳解

步驟1: 根據初始化入參生成SparkConf,再根據SparkConf來建立SparkEnv, SparkEnv中主要包含如下關鍵性組件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManagerbash

private[spark] val env = SparkEnv.create(
    conf,
    "",
    conf.get("spark.driver.host"),
    conf.get("spark.driver.port").toInt,
    isDriver = true,
    isLocal = isLocal)
  SparkEnv.set(env)

步驟2:建立TaskScheduler,根據Spark的運行模式來選擇相應的SchedulerBackend,同時啓動taskscheduler,這一步至爲關鍵app

private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
  taskScheduler.start()

TaskScheduler.start目的是啓動相應的SchedulerBackend,並啓動定時器進行檢測分佈式

override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        checkSpeculatableTasks()
      }
    }
  }

步驟3:以上一步中建立的TaskScheduler實例爲入參建立DAGScheduler並啓動運行ide

@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
  dagScheduler.start()

步驟4:啓動WEB UI函數

ui.start()

RDD的轉換過程

仍是以最簡單的wordcount爲例說明rdd的轉換過程oop

sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

上述一行簡短的代碼其實發生了很複雜的RDD轉換,下面仔細解釋每一步的轉換過程和轉換結果

步驟1:val rawFile = sc.textFile("README.md")

textFile先是生成hadoopRDD,而後再經過map操做生成MappedRDD,若是在spark-shell中執行上述語句,獲得的結果能夠證實所作的分析

scala> sc.textFile("README.md")
14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750
14/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB)
14/04/23 13:11:48 DEBUG BlockManager: Put block broadcast_0 locally took  277 ms
14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took  281 ms
res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13

步驟2: val splittedText = rawFile.flatMap(line => line.split(" "))

flatMap將原來的MappedRDD轉換成爲FlatMappedRDD

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =                                                                                                  new FlatMappedRDD(this, sc.clean(f))

步驟3:val wordCount = splittedText.map(word => (word, 1))

利用word生成相應的鍵值對,上一步的FlatMappedRDD被轉換成爲MappedRDD

步驟4:val reduceJob = wordCount.reduceByKey(_ + _),這一步最複雜

步驟2,3中使用到的operation所有定義在RDD.scala中,而這裏使用到的reduceByKey卻在RDD.scala中見不到蹤影。reduceByKey的定義出如今源文件PairRDDFunctions.scala

細心的你必定會問reduceByKey不是MappedRDD的屬性和方法啊,怎麼能被MappedRDD調用呢?其實這背後發生了一個隱式的轉換,該轉換將MappedRDD轉換成爲PairRDDFunctions

implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
    new PairRDDFunctions(rdd)

這種隱式的轉換是scala的一個語法特徵,若是想知道的更多,請用關鍵字"scala implicit method"進行查詢,會有很多的文章對此進行詳盡的介紹。

接下來再看一看reduceByKey的定義

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
    reduceByKey(defaultPartitioner(self), func)
  }

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

  def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializerClass: String = null): RDD[(K, C)] = {
    if (getKeyClass().isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else if (mapSideCombine) {
      val combined = self.mapPartitionsWithContext((context, iter) => {
        aggregator.combineValuesByKey(iter, context)
      }, preservesPartitioning = true)
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
        .setSerializer(serializerClass)
      partitioned.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      // Don't apply map-side combiner.
      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
      values.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    }
  }

reduceByKey最終會調用combineByKey, 在這個函數中PairedRDDFunctions會被轉換成爲ShuffleRDD,當調用mapPartitionsWithContext以後,shuffleRDD被轉換成爲MapPartitionsRDD

Log輸出能證實咱們的分析

res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13

RDD轉換小結

小結一下整個RDD轉換過程

HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD

整個轉換過程好長啊,這一切的轉換都發生在任務提交以前。

運行過程分析

數據集操做分類

在對任務運行過程當中的函數調用關係進行分析以前,咱們也來探討一個偏理論的東西,做用於RDD之上的Transformantion爲何會是這個樣子?

對這個問題的解答和數學搭上關係了,從理論抽象的角度來講,任務處理均可歸結爲「input->processing->output"。input和output對應於數據集dataset.

在此基礎上做一下簡單的分類

  1. one-one 一個dataset在轉換以後仍是一個dataset,並且dataset的size不變,如map
  2. one-one 一個dataset在轉換以後仍是一個dataset,但size發生更改,這種更改有兩種可能:擴大或縮小,如flatMap是size增大的操做,而subtract是size變小的操做
  3. many-one 多個dataset合併爲一個dataset,如combine, join
  4. one-many 一個dataset分裂爲多個dataset, 如groupBy

Task運行期的函數調用

task的提交過程參考本系列中的第二篇文章。本節主要講解當task在運行期間是如何一步步調用到做用於RDD上的各個operation

  • TaskRunner.run
    • Task.run
      • Task.runTask (Task是一個基類,有兩個子類,分別爲ShuffleMapTask和ResultTask)
        • RDD.iterator
          • RDD.computeOrReadCheckpoint
            • RDD.compute 

或許當看到RDD.compute函數定義時,仍是覺着f沒有被調用,以MappedRDD的compute定義爲例

override def compute(split: Partition, context: TaskContext) =                                                                                                      
    firstParent[T].iterator(split, context).map(f)

注意,這裏最容易產生錯覺的地方就是map函數,這裏的map不是RDD中的map,而是scala中定義的iterator的成員函數map, 請自行參考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

堆棧輸出

80         at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111)
 81         at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154)
 82         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
 83         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
 84         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 85         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 86         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 87         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 88         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 89         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 90         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 91         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 92         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 93         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 94         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 95         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
 96         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 97         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 98         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
 99         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
100         at org.apache.spark.scheduler.Task.run(Task.scala:53)
101         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)

ResultTask

compute的計算過程對於ShuffleMapTask比較複雜,繞的圈圈比較多,對於ResultTask就直接許多。

override def runTask(context: TaskContext): U = {
    metrics = Some(context.taskMetrics)
    try {
      func(context, rdd.iterator(split, context))
    } finally {
      context.executeOnCompleteCallbacks()
    }
  }

 計算結果的傳遞

上面的分析知道,wordcount這個job在最終提交以後,被DAGScheduler分爲兩個stage,第一個Stage是shuffleMapTask,第二個Stage是ResultTask.

那麼ShuffleMapTask的計算結果是如何被ResultTask取得的呢?這個過程簡述以下

  1. ShffuleMapTask將計算的狀態(注意不是具體的數據)包裝爲MapStatus返回給DAGScheduler
  2. DAGScheduler將MapStatus保存到MapOutputTrackerMaster中
  3. ResultTask在執行到ShuffleRDD時會調用BlockStoreShuffleFetcher的fetch方法去獲取數據
    1. 第一件事就是諮詢MapOutputTrackerMaster所要取的數據的location
    2. 根據返回的結果調用BlockManager.getMultiple獲取真正的數據

BlockStoreShuffleFetcher的fetch函數僞碼

val blockManager = SparkEnv.get.blockManager

    val startTime = System.currentTimeMillis
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
      shuffleId, reduceId, System.currentTimeMillis - startTime))

    val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
    val itr = blockFetcherItr.flatMap(unpackBlock)

注意上述代碼中的getServerStatusesgetMultiple,一個是詢問數據的位置,一個是去獲取真正的數據。

有關Shuffle的詳細解釋,請參考」詳細探究Spark的shuffle實現一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/

相關文章
相關標籤/搜索