Task的基本概念

 Task是可執行的實體。是Spark任務調度的最小單元。每一個Task都對應一個RDD的分區,也對應Executor任務執行線程池中的一個執行線程。數組

    本文介紹Task的基本概念,並分析Task的建立過程。緩存

Task和分區

從實現層面講,一個Stage是一個並行執行的Task集合,它們執行相同的計算邏輯,並做爲Spark Job執行的一部分,在同一個Stage中全部的Tasks都具備相同的shuffle依賴(在Stage的劃分一節分析過:Stage是按Shuffle依賴爲邊界進行劃分的,因此同一個Stage中的Task能夠經過Pipeline運行)。網絡

前面的文章提到過,Job中的分區對應RDD的分區,而在Spark中RDD中的一個分區對應了Stage中的一個任務,它屬於一個RDD用於計算執行函數的部分結果,這些結果做爲Spark Job結果的一部分。ide

image.png

                圖1 分區和Task的對應關係
函數

Task的分類

與Stage相對應,Task分爲兩類:this

  • ResultTask:這類Task會計算Job的最終結果,並返回結果。spa

  • ShuffleMapTask:這類Task計算Job中間步驟的結果,並把結果保存到中間的輸出文件中。線程

Task的實現

ResultTask

ResultTask是因爲執行了Action操做,在提交任務時建立的。執行Action操做時,其實就是使用一個函數來處理中間階段的輸出分區文件。orm

咱們以action的count()操做爲例來具體說ResultTask的建立和使用過程。count()操做的代碼實現很是簡單,以下:對象

 def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

咱們再看一下代碼中runJob函數的定義:

   def runJob[TUClassTag](rddRDD[T], funcIterator[T=> U):Array[U= {
     runJob(rddfunc0 until rdd.partitions.length)
  }

代碼中,runJob函數的含義是,把func函數做用於rdd的每一個分區,這樣就獲得了每一個分區的count結果(其結果是一個數組),而後再調用sum函數把這個數組的各個count數加起來,獲得最終結果。

咱們來看一下ResultTask的實現類,在該類中,定義了運行Task時的函數runTask,在該函數中會執行如下代碼:

  override def runTask(contextTaskContext): U = {
  ...
     // 觸發action時執行的函數
     func(contextrdd.iterator(partitioncontext))
  }

從實現代碼能夠看出ResultTask,會直接使用func在輸出的分區數據上。

ShuffleMapTask類

這類任務主要爲shuffle過程生成中間文件,它會經過shuffleManager來獲取ShuffleWriter,並使用該ShuffleWriter來保存shuffle過程當中的RDD分區。該類的runTask函數的主要實現代碼以下:

 override def runTask(contextTaskContext): MapStatus = {
 ...
 var writerShuffleWriter[AnyAny= null
   try {
     // 獲取shffleManager對象
     val manager = SparkEnv.get.shuffleManager
     writer = manager.getWriter[AnyAny](dep.shuffleHandle,partitionIdcontext)
     writer.write(rdd.iterator(partition,context).asInstanceOf[Iterator[_ <: Product2[AnyAny]]])
     writer.stop(success = true).get
  } catch {
    ...
  }

從以上代碼能夠看出:ShuffleMapTask會先經過ShuffleManager來獲取ShuffleWriter。並經過ShuffleWriter來寫入RDD的分區。

注意:當寫入RDD的分區時,會根據RDD的依賴關係依次計算其父RDD的分區數據。

Task的最佳位置的選擇

爲了減小網絡傳輸,提高計算效率,Spark會爲Task選擇最佳的執行位置,所謂選擇最佳執行位置,其實就是選擇在哪一個worker節點上執行Task。不一樣RDD的尋找最佳執行位置的方式不一樣,因此,在RDD的抽象父類中,定義了一個preferredLocations函數,具體類型的RDD經過本身的方式來實現該函數。

在DAGScheduler中,RDD分區計算的最佳位置在DAGScheduler#getPreferredLocs函數中完成。該函數的實現步驟以下:

  1. 若分區已經被訪問過,說明該分區的位置已經被記錄下來了,此時不須要再查找該分區的最佳位置。

  2. 若RDD的分區數據被緩存起來了,就直接返回緩存的位置。緩存的位置是經過存儲系統BlockManagerMaster來進行查找的,會調用BlockManagerMaster#getLocations函數。

  3. 若1和2都沒有找到RDD分區的最佳位置,就查看RDD是否在建立時帶有最佳位置信息,每種RDD的最佳位置信息是不一樣的,這些最佳位置經過RDD#preferredLocations函數返回。下表是其中幾種RDD的最佳位置信息:

image.png

小結

本文介紹了Task的基本概念和分類,並分析了Task最佳位置的肯定。

相關文章
相關標籤/搜索