本篇,咱們來看一下spark內核中另外一個重要的模塊,Shuffle管理器ShuffleManager。shuffle能夠說是分佈式計算中最重要的一個概念了,數據的join,聚合去重等操做都須要這個步驟。另外一方面,spark之因此比mapReduce的性能高其中一個主要的緣由就是對shuffle過程的優化,一方面spark的shuffle過程更好地利用內存(也就是咱們前面在分析內存管理時所說的執行內存),另外一方面對於shuffle過程當中溢寫的磁盤文件歸併排序和引入索引文件。固然,spark性能高的另外一個主要緣由還有對計算鏈的優化,把多步map類型的計算chain在一塊兒,大大減小中間過程的落盤,這也是spark顯著區別於mr的地方。
spark新版本的Shuffle管理器默認是SortShuffleManager。java
SparkEnv初始化部分的代碼:apache
val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
看shuffle管理器的源碼,咱們首先應該ShuffleManager的調用時機。想一下shuffle的過程,無非就是兩個步驟,寫和讀。寫是在map階段,將數據按照必定的分區規則歸類到不一樣的分區中,讀是在reduce階段,每一個分區從map階段的輸出中拉取屬於本身的數據,因此咱們分析ShuffleManager源碼基本也能夠沿着這個思路。咱們先來分析寫的過程,由於對於一個完整的shuffle過程,確定是先寫而後纔讀的。
回顧一下以前的對做業運行過程的分析,咱們應該還記得做業被切分紅任務後是在executor端執行的,而Shuffle階段的的stage被切分紅了ShuffleMapTask,shuffle的寫過程正是在這個類中完成的,咱們看一下代碼:api
能夠看到經過ShuffleManager.getWriter獲取了一個shuffle寫入器,從而將rdd的計算數據寫入磁盤。數組
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() // 反序列化RDD和shuffle, 關鍵的步驟 // 這裏思考rdd和shuffle反序列化時,內部的SparkContext對象是怎麼反序列化的 val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L var writer: ShuffleWriter[Any, Any] = null try { // shuffle管理器 val manager = SparkEnv.get.shuffleManager // 獲取一個shuffle寫入器 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // 這裏能夠看到rdd計算的核心方法就是iterator方法 // SortShuffleWriter的write方法能夠分爲幾個步驟: // 將上游rdd計算出的數據(經過調用rdd.iterator方法)寫入內存緩衝區, // 在寫的過程當中若是超過 內存閾值就會溢寫磁盤文件,可能會寫多個文件 // 最後將溢寫的文件和內存中剩餘的數據一塊兒進行歸併排序後寫入到磁盤中造成一個大的數據文件 // 這個排序是先按分區排序,在按key排序 // 在最後歸併排序後寫的過程當中,沒寫一個分區就會手動刷寫一遍,並記錄下這個分區數據在文件中的位移 // 因此實際上最後寫完一個task的數據後,磁盤上會有兩個文件:數據文件和記錄每一個reduce端partition數據位移的索引文件 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 主要是刪除中間過程的溢寫文件,向內存管理器釋放申請的內存 writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
這裏根據shuffle類型獲取不一樣的ShuffleWriter對象,大多數狀況下,都是SortShuffleWriter類型,因此咱們直接看SortShuffleWriter.write方法。數據結構
/** Get a writer for a given partition. Called on executors by map tasks. */ // 獲取一個shuffle存儲器,在executor端被調用,在執行一個map task調用 override def getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) val env = SparkEnv.get handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) } }
總結一下這個方法的主要邏輯:app
在stop方法中還會作一些收尾工做,統計磁盤io耗時,刪除中間溢寫文件分佈式
override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { // map端進行合併的狀況,此時用戶應該提供聚合器和順序 require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } // 將map數據所有寫入排序器中, // 這個過程當中可能會生成多個溢寫文件 sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). // mapId就是shuffleMap端RDD的partitionId // 獲取這個map分區的shuffle輸出文件名 val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) // 加一個uuid後綴 val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) // 這一步將溢寫到的磁盤的文件和內存中的數據進行歸併排序, // 並溢寫到一個文件中,這一步寫的文件是臨時文件名 val partitionLengths = sorter.writePartitionedFile(blockId, tmp) // 這一步主要是寫入索引文件,使用move方法原子第將臨時索引和臨時數據文件重命名爲正常的文件名 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) // 返回一個狀態對象,包含shuffle服務Id和各個分區數據在文件中的位移 mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }
咱們首先看一下獲取shuffle輸出文件名,是經過IndexShuffleBlockResolver組件獲取的,而它的內部又是經過BlockManager內部的DiskBlockManager分配文件名的,這個DiskBlockManager我在以前分析塊管理器時提到過,它的做用就是管理文件名的分配,以及spark使用的目錄,子目錄的建立刪除等。咱們看到對於數據文件和索引文件的命名規則是不同的,他們的命名規則分別定義在ShuffleDataBlockId和ShuffleIndexBlockId中。ide
def getDataFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) } private def getIndexFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) }
咱們根據SortShuffleWriter中的調用順序,首先看一下ExternalSorter.insertAll方法:性能
而後將數據一條一條地循環插入內存的存儲結構中,同時考慮到map端合併的狀況優化
def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined // 在map端進行合併的狀況 if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() // 向內存緩衝中插入一條數據 map.changeValue((getPartition(kv._1), kv._1), update) // 若是緩衝超過閾值,就會溢寫到磁盤生成一個文件 // 每寫入一條數據就檢查一遍內存 maybeSpillCollection(usingMap = true) } } else {// 再也不map端合併的狀況 // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } }
咱們看一個稍微複雜一點的結構,AppendOnlyMap,
能夠看出,這個結構設計仍是很精良的,這裏面有個很重的方法,incrementSize方法中會判斷當前數據量的大小,若是超過閾值就會擴容,這個擴容的方法比較複雜,就是一個從新hash再分佈的過程,不過有一點,發不管是在插入新數據仍是從新hash再分佈的過程當中,對於hash碰撞的處理策略必定要相同,不然可能形成不一致。
// 向數組中插入一個kv對, def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] // 處理key爲空的狀況 if (k.eq(null)) { // 若是是第一次插入空值,那麼須要將大小增長1 if (!haveNullValue) { incrementSize() } nullValue = updateFunc(haveNullValue, nullValue) haveNullValue = true return nullValue } var pos = rehash(k.hashCode) & mask // 線性探測法處理hash碰撞 // 這裏是一個加速的線性探測,即第一次碰撞時走1步, // 第二次碰撞時走2步,第三次碰撞時走3步 var i = 1 while (true) { val curKey = data(2 * pos) if (curKey.eq(null)) {// 若是舊值不存在,直接插入 val newValue = updateFunc(false, null.asInstanceOf[V]) data(2 * pos) = k data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] incrementSize() return newValue } else if (k.eq(curKey) || k.equals(curKey)) {// 若是舊值存在,須要更新 val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] return newValue } else {// 發生hash碰撞,向後探測,跳躍性的探測 val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy }
咱們回到ExternalSorter的插入方法中,沒插入一條數據都要檢查內存佔用,判斷是否須要溢寫到磁盤,若是須要就溢寫到磁盤。
這個方法裏調用了map.estimateSize來估算當前插入的數據的內存佔用大小,對於內存佔用的追蹤和估算的功能是在SizeTracker特質中實現的,這個特質我在以前分析MemoryStore時提到過,在將對象類型的數據插入內存中時使用了一箇中間態的數據結構DeserializedValuesHolder,它的內部有一個SizeTrackingVector,這個類就是經過繼承SizeTracker特徵從而實現對象大小的追蹤和估算。
private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L if (usingMap) { estimatedSize = map.estimateSize() if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } } else { estimatedSize = buffer.estimateSize() if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } }
首先檢查當前內存佔用是否超過閾值,若是超過會申請一次執行內存,若是沒有申請到足夠的執行內存,那麼依然須要溢寫到磁盤
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false // 每寫入32條數據檢查一次 if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold // 向內存管理器申請執行內存 val granted = acquireMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection // 若是內存佔用超過了閾值,那麼就須要溢寫 shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) // 溢寫到磁盤 spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory // 釋放內存 releaseMemory() } shouldSpill }
接着上面的方法,
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { // 獲取一個排序後的迭代器 val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) // 將數據寫入磁盤文件中 val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) spills += spillFile }
這個方法返回按照分區和key排序過的迭代器,其具體的排序邏輯在AppendOnlyMap.destructiveSortedIterator中
這段代碼分爲兩塊,首先對數組進行壓緊,是的稀疏的數據所有轉移到數組的頭部;
而後對數組按照比較器進行排序,比較器首先是按照分區進行比較,若是分區相同才按照key進行比較;
而後返回一個迭代器,這個迭代器僅僅是對數組的封裝。經過這個方法,咱們大概知道了AppendonlyMap的排序邏輯。
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = { destroyed = true // Pack KV pairs into the front of the underlying array // 這段代碼將稀疏的數據所有轉移到數組頭部,將數據壓緊 var keyIndex, newIndex = 0 while (keyIndex < capacity) { if (data(2 * keyIndex) != null) { data(2 * newIndex) = data(2 * keyIndex) data(2 * newIndex + 1) = data(2 * keyIndex + 1) newIndex += 1 } keyIndex += 1 } assert(curSize == newIndex + (if (haveNullValue) 1 else 0)) // 根據比較器對數據進行排序 new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator) new Iterator[(K, V)] { var i = 0 var nullValueReady = haveNullValue def hasNext: Boolean = (i < newIndex || nullValueReady) def next(): (K, V) = { if (nullValueReady) { nullValueReady = false (null.asInstanceOf[K], nullValue) } else { val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V]) i += 1 item } } } }
回到ExternalSorter.spill方法中,在獲取了通過排序後 的迭代器以後,咱們就能夠將數據溢寫到磁盤上了。
這個方法的代碼我不貼了,總結一下主要步驟:
總結一下數據經過ExternalSorter向磁盤溢寫的全過程:
總結一下主要的步驟:
循環將數據寫入磁盤,每當一個分區的數據寫完後,進行一次刷寫,將數據從os的文件緩衝區同步到磁盤上,而後獲取此時的文件長度,記錄下每一個分區在文件中的位移
def writePartitionedFile( blockId: BlockId, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics().shuffleWriteMetrics) // 若是前面沒有數據溢寫到磁盤中, // 則只須要將內存中的數據溢寫到磁盤 if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer // 返回排序後的迭代器 val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { val partitionId = it.nextPartition() while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) } // 寫完一個分區刷寫一次 val segment = writer.commitAndGet() // 記錄下分區的數據在文件中的位移 lengths(partitionId) = segment.length } } else {// 有溢寫到磁盤的文件 // We must perform merge-sort; get an iterator by partition and write everything directly. // 封裝一個用於歸併各個溢寫文件以及內存緩衝區數據的迭代器 // TODO 這個封裝的迭代器是實現歸併排序的關鍵 for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { for (elem <- elements) { writer.write(elem._1, elem._2) } // 每寫完一個分區,主動刷寫一次,獲取文件位移, // 這個位移就是寫入的分區的位移, // reduce端在拉取數據時就會根據這個位移直接找到應該拉取的數據的位置 val segment = writer.commitAndGet() lengths(id) = segment.length } } } writer.close() // 寫完後更新一些統計信息 context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) // 返回每一個reduce端分區數據在文件中的位移信息 lengths }
仍然回到SortShuffleWriter.write方法,最後一步調用了IndexShuffleBlockResolver.writeIndexFileAndCommit方法,
這個方法的做用主要是將每一個的分區的位移值寫入到一個索引文件中,並將臨時的索引文件和臨時的數據文件重命名爲正常的文件名(重命名操做是一個原子操做)
我總結shuffle寫數據的過程,能夠分爲兩個主要的步驟: