Shuffle Map的過程,即Shuffle Stage的ShuffleTask按照必定的規則將數據寫到相應的文件中,並把寫的文件"位置信息" 以MapOutput返回給DAGScheduler ,MapOutput將它更新到特定位置就完成了整個Shuffle Map過程. 在Spark中,Shuffle reduce過程抽象化爲ShuffledRDD,即這個RDD的compute方法計算每個分片即每個reduce的數據是經過拉取ShuffleMap輸出的文件並返回Iterator來實現的html
二者差異不大,度分爲map和reduce兩個階段。node
從 high-level 的角度來看,二者並無大的差異。 都是將 mapper(Spark 裏是 ShuffleMapTask)的輸出進行 partition,不一樣的 partition 送到不一樣的 reducer(Spark 裏 reducer 多是下一個 stage 裏的 ShuffleMapTask,也多是 ResultTask)。Reducer 之內存做緩衝區,邊 shuffle 邊 aggregate 數據,等到數據 aggregate 好之後進行 reduce() (Spark 裏多是後續的一系列操做)。算法
差異較大,Hadoop在Map和reduce階段都有排序操做,而spark默認使用hash進行聚合,不會提早進行排序操做。編程
從 low-level 的角度來看,二者差異不小。 Hadoop MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在於 combine/reduce() 能夠處理大規模的數據,由於其輸入數據能夠經過外排獲得(mapper 對每段數據先作排序,reducer 的 shuffle 對排好序的每段數據作歸併)。目前的 Spark 默認選擇的是 hash-based,一般使用 HashMap 來對 shuffle 來的數據進行 aggregate,不會對數據進行提早排序。若是用戶須要通過排序的數據,那麼須要本身調用相似 sortByKey() 的操做緩存
mapreduce將處理流程進行細化出map,shuffle,sort,reduce等幾個階段,而spark只有一個stage和一系列的transformation()網絡
Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map(), spill, merge, shuffle, sort, reduce() 等。每一個階段各司其職,能夠按照過程式的編程思想來逐一實現每一個階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不一樣的 stage 和一系列的 transformation(),因此 spill, merge, aggregate 等操做須要蘊含在 transformation() 中。數據結構
爲了分析方便,假定每一個Executor只有1個CPU core,也就是說,不管這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。併發
shuffle write階段,主要就是在一個stage結束計算以後,爲了下一個stage能夠執行shuffle類的算子(好比reduceByKey),而將每一個task處理的數據按key進行「分類」。所謂「分類」,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤以前,會先將數據寫入內存緩衝中,當內存緩衝填滿以後,纔會溢寫到磁盤文件中去。參見下面HashShuffleManager圖示。app
參見: ShuffleMapStage函數
MapOutputTracker :是爲MapOutput提供一個訪問入口,提供了註冊和獲取MapStatus的接口。
MapOutputTracker能夠把每一個Map輸出的MapStatus註冊到Tracker,同時Tracker也提供了訪問接口,能夠從該Tracker中讀取指定每一個ShuffleID所對應的map輸出的位置;
同時MapOutputTracker也是主從結構,其中Master提供了將Map輸出註冊到Tracker的入口, slave運行在每一個Executor上,提供讀取入口, 可是這個讀取過程須要和Master進行交互,將指定的 ShuffleID所對應的MapStatus信息從Master中fetch過來;
參見: 提交stage
driver端,記錄shuffle信息
MapStatus數據記錄的格式:{shuffleId,mapId,MapStatus}
每一個Shuffle都對應一個ShuffleID,該ShuffleID下面對應多個MapID,每一個MapID都會輸出一個MapStatus,經過該MapStatus,能夠定位每一個 MapID所對應的ShuffleMapTask運行過程當中所對應的機器
經過shuffleID進行索引,存儲了全部註冊到tracker的Shuffle, 經過registerShuffle能夠進行註冊Shuffle, 經過registerMapOutput能夠在每次ShuffleMapTask結束之後,將Map的輸出註冊到Track中; 同時提供了getSerializedMapOutputStatuses接口 將一個Shuffle全部的MapStatus進行序列化並進行返回;
class MapOutputTrackerMaster{ val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala val mapStatuses = new Array[MapStatus](numPartitions) // 在建立stage時,初始化ShuffleStatus def registerShuffle(shuffleId: Int, numMaps: Int) { shuffleStatuses.put( shuffleId,new ShuffleStatus(numMaps)) } // 將MapTask的輸出註冊到Track中 def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { **// {shuffleId,mapId,MapStatus}** shuffleStatuses(shuffleId).addMapOutput(mapId, status) } def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized { mapStatuses(mapId) = status } } // mapStatus中包含了task運行位置,partitions數量等信息 MapStatus{ def location: BlockManagerId def getSizeForBlock(reduceId: Int): Long }
excutor端獲取shuffle信息,注意:local模式下是直接從trackerMaster獲取信息的(worker和master擁有相同的父類,local模式下直接獲取不用再走RPC調用)
MapOutputTrackerWorker的實現很簡單,核心功能就是getServerStatuses, 它獲取指定Shuffle的每一個reduce所對應的MapStatus信息
class MapOutputWorker{ def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { // 根據shuffleId獲取MapStatus集合 val statuses = getStatuses(shuffleId) // 根據shuffleId和起始分區,從mapStatus獲取響應的blockManager信息 MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } // 發送消息給trackerMaster,獲取mapOutPut信息 def askTracker{ var trackerEndpoint: RpcEndpointRef = _ trackerEndpoint.askSync[T](message) }
ShuffleMapTask負責寫數據操做,最後會生成.data和.index文件,在執行完畢後返回一個MapStatus對象。
ShuffleMapTask在excutor上獲取到具體的writer後進行實際的寫操做
class ShuffleMapTask extends Task( def runTask(context: TaskContext): MapStatus = { // 反序列化接收到的數據 val (rdd, dep) = closureSerializer.deserialize( ByteBuffer.wrap(taskBinary.value)) // 調用ShuffleManager的getWriter方法獲取一組writer writer = manager.getWriter(dep.shuffleHandle, partitionId, context) // 遍歷RDD進行write writer.write() } }
參見: writer
Get a writer for a given partition. Called on executors by map tasks.
由於Shuffle過程當中須要將Map結果數據輸出到文件,因此須要經過註冊一個ShuffleHandle來獲取到一個ShuffleWriter對象,經過它來控制Map階段記錄數據輸出的行爲。其中,ShuffleHandle包含了以下基本信息:
shuffleId:標識Shuffle過程的惟一ID numMaps:RDD對應的Partitioner指定的Partition的個數,也就是ShuffleMapTask輸出的Partition個數 dependency:RDD對應的依賴ShuffleDependency
class SortShuffleManager{ def getWriter(){ handle match { case SerializedShuffleHandle=> new UnsafeShuffleWriter() case BypassMergeSortShuffleHandle=> new BypassMergeSortShuffleWriter() case BaseShuffleHandle=> new SortShuffleWriter() } }
若是ShuffleDependency中的Serializer,容許對將要輸出數據對象進行排序後,再執行序列化寫入到文件,則會選擇建立一個SerializedShuffleHandle,生成一個UnsafeShuffleWriter
除了上面兩種ShuffleHandle之後,其餘狀況都會建立一個BaseShuffleHandle對象,它會以反序列化的格式處理Shuffle輸出數據。
數據記錄格式:
// shuffle_shuffleId_mapId_reducId shuffle_2901_11825_0.data shuffle_2901_11825_0.index
數據格式有兩種,若是不須要合併則使用buffer,若是須要合併使用map
class ExternalSorter{ map = new PartitionedAppendOnlyMap[K, C] buffer = new PartitionedPairBuffer[K, C] def insertAll{ if(shouldCombine){ map.changeValue() }else{ buffer.insert() } } }
2.2.1.3.1.1.1. map
在Map階段會執行Combine操做,在Map階段進行Combine操做可以下降Map階段數據記錄的總數,從而下降Shuffle過程當中數據的跨網絡拷貝傳輸。這時,RDD對應的ShuffleDependency須要設置一個Aggregator用來執行Combine操做
map是內存數據結構,最重要的是update函數和map的changeValue方法(這裏的map對應的實現類是PartitionedAppendOnlyMap)。update函數所作的工做,其實就是對createCombiner和mergeValue這兩個函數的使用,第一次遇到一個Key調用createCombiner函數處理,非首次遇到同一個Key對應新的Value調用mergeValue函數進行合併處理。map的changeValue方法主要是將Key和Value在map中存儲或者進行修改(對出現的同一個Key的多個Value進行合併,並將合併後的新Value替換舊Value)。 PartitionedAppendOnlyMap是一個通過優化的哈希表,它支持向map中追加數據,以及修改Key對應的Value,可是不支持刪除某個Key及其對應的Value。它可以支持的存儲容量是0.7 * 2 ^ 29 = 375809638。當達到指定存儲容量或者指定限制,就會將map中記錄數據Spill到磁盤文件,這個過程和前面的相似
class ExternalSorter{ map = new PartitionedAppendOnlyMap[K, C] buffer = new PartitionedPairBuffer[K, C] def insertAll{ if(shouldCombine){ // 定義一個aggtregator函數 val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } // 使用update函數實現對新增元素的合併操做 map.changeValue((getPartition(kv._1), kv._1), update) }else{ buffer.insert() } } }
2.2.1.3.1.1.2. buffer
map端不須要排序時使用的數據存儲格式
Map階段不進行Combine操做,在內存中緩存記錄數據會使用PartitionedPairBuffer這種數據結構來緩存、排序記錄數據,它是一個Append-only Buffer,僅支持向Buffer中追加數據鍵值對記錄
組裝完數據後寫磁盤
class ExternalSorter{ map = new PartitionedAppendOnlyMap[K, C] buffer = new PartitionedPairBuffer[K, C] def insertAll{ if(shouldCombine){ maybeSpillCollection(usingMap = true) }else{ maybeSpillCollection(usingMap = false) } } }
shuffle read,一般就是一個stage剛開始時要作的事情。此時該stage的每個task就須要將上一個stage的計算結果中的全部相同key,從各個節點上經過網絡都拉取到本身所在的節點上,而後進行key的聚合或鏈接等操做。因爲shuffle write的過程當中,task爲下游stage的每一個task都建立了一個磁盤文件,所以shuffle read的過程當中,每一個task只要從上游stage的全部task所在節點上,拉取屬於本身的那一個磁盤文件便可。
shuffle read的拉取過程是一邊拉取一邊進行聚合的。每一個shuffle read task都會有一個本身的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,而後經過內存中的一個Map進行聚合等操做。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操做。以此類推,直到最後將全部數據到拉取完,並獲得最終的結果
參見: ResultTask
調用ShuffleManager經過getReader方法獲取具體的Reader,去讀數據。
class ShuffledRDD { def compute(){ val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] } }
ShuffleManager是shuffle整個過程的管理入口,對外提供讀寫等接口。
ShuffleManager在driver中建立,driver用ShuffleManager進行註冊shuffle,執行讀寫操做等
對Shuffle作了什麼優化來提供Spark的性能,本質上就是對ShuffleManager進行優化和提供新的實現
spark2.2.0中已取消對HashShuffleManager的支持 新增了tungsten-sort。
ShuffleManager有兩種實現HashShuffleManager和SorShuffleManager,1.1一會的版本默認是SortShuffleManger,可經過 conf.get("spark.shuffle.manager", "sort") 修改默認的shuffle實現方式
SortShuffleManager和HashShuffleManager有一個本質的差異,即同一個map的多個reduce的數據都寫入到同一個文件中;那麼SortShuffleManager產生的Shuffle 文件個數爲2*Map個數
// shuffleManger提供的功能 private[spark] trait ShuffleManager { // shuffle註冊 def registerShuffle(shuffleId: Int, numMaps: Int,dependency: ShuffleDependency): ShuffleHandle // shuffle註銷 def unregisterShuffle(shuffleId: Int): Boolean // mapTask返回一組Writer def getWriter(handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter // 提供Start分區編號和end分區編號;固然通常狀況若是每一個reduce單獨運行,那麼start-end區間也只對應一個reduce def getReader(handle: ShuffleHandle,startPartition: Int,endPartition: Int,context: TaskContext): ShuffleReader def shuffleBlockManager: ShuffleBlockManager def stop(): Unit }
spark2.2.0中已取消對HashShuffleManager的支持 (SPARK-14667)。參考:http://lxw1234.com/archives/2016/05/666.htm
HashShuffleManager是Spark最先版本的ShuffleManager,該ShuffleManager的嚴重缺點是會產生太多小文件,特別是reduce個數不少時候,存在很大的性能瓶頸。
最第一版本:ShuffleMapTask個數×reduce個數 後期版本: 併發的ShuffleMapTask的個數爲M xreduce個數
參考:http://shiyanjun.cn/archives/1655.html
根據partition的起止位置,從別的節點獲取blockURL,node信息組成reader,
class BlockStoreShuffleReader[K, C]( handle: BaseShuffleHandle[K, _, C], startPartition: Int, endPartition: Int, context: TaskContext, serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) extends ShuffleReader[K, C] with Logging { var blocksByAddress= mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) // 根據獲取到的block信息,給trackerMaster發送消息,獲取RDD數據 new ShuffleBlockFetcherIterator( blocksByAddress ) } class MapOutputTracker{ // excutor在計算ShuffleRDD時調用,返回{blocak地址,Seq{blockID,和輸出數據大小}}等 def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] } class ShuffleBlockFetcherIterator{ val results = new LinkedBlockingQueue[FetchResult] // 負責發送請求和接收數據 def sendRequest(req: FetchRequest){ // 將接收到的數據放入到隊列中 results.put(new SuccessFetchResult( blockId: BlockId, address: BlockManagerId, size: Long, **buf: ManagedBuffer,** isNetworkReqDone: Boolean )) } }
參見: writer
Shuffle過程當中須要將Map結果數據輸出到文件,因此須要經過註冊一個ShuffleHandle來獲取到一個ShuffleWriter對象,經過它來控制Map階段記錄數據輸出的行爲
sparkCore源碼解析系列: