Task是可執行的實體。是Spark任務調度的最小單元。每一個Task都對應一個RDD的分區,也對應Executor任務執行線程池中的一個執行線程。數組
本文介紹Task的基本概念,並分析Task的建立過程。緩存
從實現層面講,一個Stage是一個並行執行的Task集合,它們執行相同的計算邏輯,並做爲Spark Job執行的一部分,在同一個Stage中全部的Tasks都具備相同的shuffle依賴(在Stage的劃分一節分析過:Stage是按Shuffle依賴爲邊界進行劃分的,因此同一個Stage中的Task能夠經過Pipeline運行)。網絡
前面的文章提到過,Job中的分區對應RDD的分區,而在Spark中RDD中的一個分區對應了Stage中的一個任務,它屬於一個RDD用於計算執行函數的部分結果,這些結果做爲Spark Job結果的一部分。ide
圖1 分區和Task的對應關係
函數
與Stage相對應,Task分爲兩類:this
ResultTask:這類Task會計算Job的最終結果,並返回結果。spa
ShuffleMapTask:這類Task計算Job中間步驟的結果,並把結果保存到中間的輸出文件中。線程
ResultTask是因爲執行了Action操做,在提交任務時建立的。執行Action操做時,其實就是使用一個函數來處理中間階段的輸出分區文件。orm
咱們以action的count()操做爲例來具體說ResultTask的建立和使用過程。count()操做的代碼實現很是簡單,以下:對象
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
咱們再看一下代碼中runJob函數的定義:
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U):Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
代碼中,runJob函數的含義是,把func函數做用於rdd的每一個分區,這樣就獲得了每一個分區的count結果(其結果是一個數組),而後再調用sum函數把這個數組的各個count數加起來,獲得最終結果。
咱們來看一下ResultTask的實現類,在該類中,定義了運行Task時的函數runTask,在該函數中會執行如下代碼:
override def runTask(context: TaskContext): U = {
...
// 觸發action時執行的函數
func(context, rdd.iterator(partition, context))
}
從實現代碼能夠看出ResultTask,會直接使用func在輸出的分區數據上。
這類任務主要爲shuffle過程生成中間文件,它會經過shuffleManager來獲取ShuffleWriter,並使用該ShuffleWriter來保存shuffle過程當中的RDD分區。該類的runTask函數的主要實現代碼以下:
override def runTask(context: TaskContext): MapStatus = {
...
var writer: ShuffleWriter[Any, Any] = null
try {
// 獲取shffleManager對象
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle,partitionId, context)
writer.write(rdd.iterator(partition,context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
...
}
從以上代碼能夠看出:ShuffleMapTask會先經過ShuffleManager來獲取ShuffleWriter。並經過ShuffleWriter來寫入RDD的分區。
注意:當寫入RDD的分區時,會根據RDD的依賴關係依次計算其父RDD的分區數據。
爲了減小網絡傳輸,提高計算效率,Spark會爲Task選擇最佳的執行位置,所謂選擇最佳執行位置,其實就是選擇在哪一個worker節點上執行Task。不一樣RDD的尋找最佳執行位置的方式不一樣,因此,在RDD的抽象父類中,定義了一個preferredLocations函數,具體類型的RDD經過本身的方式來實現該函數。
在DAGScheduler中,RDD分區計算的最佳位置在DAGScheduler#getPreferredLocs函數中完成。該函數的實現步驟以下:
若分區已經被訪問過,說明該分區的位置已經被記錄下來了,此時不須要再查找該分區的最佳位置。
若RDD的分區數據被緩存起來了,就直接返回緩存的位置。緩存的位置是經過存儲系統BlockManagerMaster來進行查找的,會調用BlockManagerMaster#getLocations函數。
若1和2都沒有找到RDD分區的最佳位置,就查看RDD是否在建立時帶有最佳位置信息,每種RDD的最佳位置信息是不一樣的,這些最佳位置經過RDD#preferredLocations函數返回。下表是其中幾種RDD的最佳位置信息:
本文介紹了Task的基本概念和分類,並分析了Task最佳位置的肯定。