這個組件做爲shuffle的一個輔助組件,在整個shuffle模塊中具備很重要的做用。咱們在前面一系列的分析中,或多或少都會提到這個組件,好比在DAGScheduler提交一個stage時會將這個stage封裝成一個任務集(TaskSet),可是可能有的分區已經計算過了,有告終果(stage因爲失敗可能會屢次提交,其中有部分task可能已經計算完成),這些分區就不須要再次計算,而只須要計算那些失敗的分區,那麼很顯然須要有一個組件來維護shuffle過程當中的任務失敗成功的狀態,以及計算結果的位置信息。
此外,在shuffle讀取階段,咱們知道一個reduce端的分區會依賴於多個map端的分區的輸出數據,那麼咱們在讀取一個reduce分區對應的數據時,就須要知道這個reduce分區依賴哪些map分區,每一個block的物理位置是什麼,blockId是什麼,這個block中屬於這個reduce分區的數據量大小是多少,這些信息的記錄維護都是靠MapOutputTracker來實現的,因此咱們如今知道MapOutputTracker的重要性了。數組
MapOutputTracker組件的主要功能類和輔助類所有在這個文件中,我先大概說一下各個類的主要做用,而後重點分析關鍵的類。緩存
總的來看,最核心的類是MapOutputTrackerMaster,其餘的類都是圍繞這個類的一些輔助類,因此咱們重點分析MapOutputTrackerMaster,其餘的類我不打算深刻展開,相信讀者本身也可以較爲輕鬆地理解。框架
這個方法在上面已經提到了,會在DAGScheduler封裝任務集的時候查找一個stage須要計算的分區時會調用到。工具
def findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = { shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) }
ShuffleStatus.findMissingPartitionsui
def findMissingPartitions(): Seq[Int] = synchronized { val missing = (0 until numPartitions).filter(id => mapStatuses(id) == null) assert(missing.size == numPartitions - _numAvailableOutputs, s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}") missing }
這兩段代碼很簡單,不用多說,就是從map結構中查找。spa
此外,像registerShuffle,registerMapOutput,unregisterMapOutput,unregisterShuffle,removeOutputsOnHost等等,咱們能夠看到這幾個方法自己都是很簡答的,無非就是對內部map結構的插入,更新和查找,關鍵的是你要清楚這些方法的調用時機是什麼?弄清這一點,會讓咱們對MapOutputTracker在整個spark框架中的做用和充當的角色有更深的理解。方法的調用地點,經過Idea這類IDE工具其實均可以很簡單地定位到,這裏我不作過多展開,僅僅簡單地歸納一下:scala
咱們來看另外一個比較重要的方法,在reduce階段讀取數據時,一個task首先須要知道它依賴於哪些map輸出,這時它回想driver端的MapOutputTrackerMasterEndpoint組件發送一個獲取map輸出的消息,通過一系列方法調用最終會調用這個方法:code
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => // 將全部的mapStatus數組轉換成(BlockManagerId, Seq[(BlockId, Long)])對象 shuffleStatus.withMapStatuses { statuses => MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } case None => Seq.empty } }
咱們看一下:MapOutputTracker.convertMapStatuses,這個方法也很簡單,其實就是將每一個map分區輸出切分紅reduce分區數量,最後產生的(BlockId, Long)元組數量等於map分區數量*reduce分區數量。對象
def convertMapStatuses( shuffleId: Int, startPartition: Int, endPartition: Int, statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { assert (statuses != null) // 用於存放結果 val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]] // 最後產生的(BlockId, Long)元組數量等於map分區數量*reduce分區數量 for ((status, mapId) <- statuses.zipWithIndex) { if (status == null) { val errorMessage = s"Missing an output location for shuffle $shuffleId" logError(errorMessage) throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) } else { for (part <- startPartition until endPartition) { splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += ((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) } } } splitsByAddress.toSeq }
咱們來看另一個比較重要的方法。咱們知道reduce端的分區通常會依賴於多個map端分區輸出,可是對於每一個map分區依賴的數據量是不一樣的,舉個極端的例子,假設reduce端某個分區依賴於10個map端的輸出分區,可是其中一個分區依賴的數據有10000條,而其餘分區依賴的數據只有1條,這種狀況下,顯然咱們應該吧這個reduce任務優先調度到那個依賴了10000條的executor上。固然這個例子舉得很簡單,可能也不是什麼準確,可是也足夠說明這個方法的做用。接口
def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int) : Seq[String] = { // 首先判斷幾個參數配置,若是都符合條件,那麼再進行偏向位置的計算 if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD && dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) { // 關鍵調用 val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId, dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION) if (blockManagerIds.nonEmpty) { blockManagerIds.get.map(_.host) } else { Nil } } else { Nil } }
能夠看出來,關鍵的方法是getLocationsWithLargestOutputs,接下來,咱們就來看一下這個方法:
註釋已經說得很清楚,這個方法的邏輯很簡單,好比一個reduce端分區要讀取的總數據量是100m, 某個executor上的全部map輸出中與這個reduce分區相關的數據加起來有20m,即超過了總量的0.2,這時這個executor就可以成爲偏向位置,是否是很簡單。可是這裏應該注意到一個問題,這個方法是以executor爲最小單位計算偏向位置,而在前一個方法getPreferredLocationsForShuffle中,獲取到成爲偏向位置的那些BlockManagerId後,僅僅是取出了host做爲偏向位置返回給上層調用者,問題在於一個host(即物理節點)上可能有多個executor,這就會形成返回的結果中會有重複的host,;另外,既然返回host做爲偏向位置,那爲何不直接以host做爲最小單位來計算偏向位置呢,好比將一個host上全部與這個reduce分區相關的數據加起來,若是超過0.2的佔比就認爲這個host可以做爲偏向位置,這樣好像更合理,也更容易產生偏向位置。舉個極端的例子,一個host上運行了5個executor,每一個executor與分區相關的數據佔比0.1,另外有5個host上每一個都只運行了一個executor,他們的數據佔比均爲0.1,這種狀況下是不會產生偏向位置的,可是實際上顯然應該將那個擁有5個executor的host做爲偏向位置。
def getLocationsWithLargestOutputs( shuffleId: Int, reducerId: Int, numReducers: Int, fractionThreshold: Double) : Option[Array[BlockManagerId]] = { val shuffleStatus = shuffleStatuses.get(shuffleId).orNull // 對shuffleStatus非空檢查 if (shuffleStatus != null) { shuffleStatus.withMapStatuses { statuses => // 對mapStatus數組的非空檢查 if (statuses.nonEmpty) { // HashMap to add up sizes of all blocks at the same location // 記錄每一個executor上的全部map輸出的block中屬於這個reduce端分區的數據量 val locs = new HashMap[BlockManagerId, Long] var totalOutputSize = 0L var mapIdx = 0 while (mapIdx < statuses.length) { val status = statuses(mapIdx) // status may be null here if we are called between registerShuffle, which creates an // array with null entries for each output, and registerMapOutputs, which populates it // with valid status entries. This is possible if one thread schedules a job which // depends on an RDD which is currently being computed by another thread. if (status != null) { val blockSize = status.getSizeForBlock(reducerId) if (blockSize > 0) { locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize totalOutputSize += blockSize } } mapIdx = mapIdx + 1 } // 最後,判斷一個executor可否成爲偏向位置的條件是: // 這個executor上全部與這個reduce分區相關的數據大小與這個分區數據總量的比值是否大於一個閾值 // 這個閾值默認是0.2 val topLocs = locs.filter { case (loc, size) => size.toDouble / totalOutputSize >= fractionThreshold } // Return if we have any locations which satisfy the required threshold if (topLocs.nonEmpty) { return Some(topLocs.keys.toArray) } } } } None }
國際慣例,再晚也要總結一下。咱們簡單總結一下map輸出追蹤器的做用: