sparkCore源碼解析之shuffle

shuffle

Shuffle Map的過程,即Shuffle Stage的ShuffleTask按照必定的規則將數據寫到相應的文件中,並把寫的文件"位置信息" 以MapOutput返回給DAGScheduler ,MapOutput將它更新到特定位置就完成了整個Shuffle Map過程. 在Spark中,Shuffle reduce過程抽象化爲ShuffledRDD,即這個RDD的compute方法計算每個分片即每個reduce的數據是經過拉取ShuffleMap輸出的文件並返回Iterator來實現的html

1. 對比MapReduce

img

1.1. 宏觀比較

二者差異不大,度分爲map和reduce兩個階段node

從 high-level 的角度來看,二者並無大的差異。 都是將 mapper(Spark 裏是 ShuffleMapTask)的輸出進行 partition,不一樣的 partition 送到不一樣的 reducer(Spark 裏 reducer 多是下一個 stage 裏的 ShuffleMapTask,也多是 ResultTask)。Reducer 之內存做緩衝區,邊 shuffle 邊 aggregate 數據,等到數據 aggregate 好之後進行 reduce() (Spark 裏多是後續的一系列操做)。算法

1.2. 微觀比較

差異較大,Hadoop在Map和reduce階段都有排序操做,而spark默認使用hash進行聚合,不會提早進行排序操做。編程

從 low-level 的角度來看,二者差異不小。 Hadoop MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在於 combine/reduce() 能夠處理大規模的數據,由於其輸入數據能夠經過外排獲得(mapper 對每段數據先作排序,reducer 的 shuffle 對排好序的每段數據作歸併)。目前的 Spark 默認選擇的是 hash-based,一般使用 HashMap 來對 shuffle 來的數據進行 aggregate,不會對數據進行提早排序。若是用戶須要通過排序的數據,那麼須要本身調用相似 sortByKey() 的操做緩存

1.3. 實現方式

mapreduce將處理流程進行細化出map,shuffle,sort,reduce等幾個階段,而spark只有一個stage和一系列的transformation()網絡

Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map(), spill, merge, shuffle, sort, reduce() 等。每一個階段各司其職,能夠按照過程式的編程思想來逐一實現每一個階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不一樣的 stage 和一系列的 transformation(),因此 spill, merge, aggregate 等操做須要蘊含在 transformation() 中。數據結構

2. Map

爲了分析方便,假定每一個Executor只有1個CPU core,也就是說,不管這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。併發

shuffle write階段,主要就是在一個stage結束計算以後,爲了下一個stage能夠執行shuffle類的算子(好比reduceByKey),而將每一個task處理的數據按key進行「分類」。所謂「分類」,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤以前,會先將數據寫入內存緩衝中,當內存緩衝填滿以後,纔會溢寫到磁盤文件中去。參見下面HashShuffleManager圖示。app

2.1. MapStatus的註冊和獲取

img 參見: ShuffleMapStage函數

MapOutputTracker :是爲MapOutput提供一個訪問入口,提供了註冊和獲取MapStatus的接口。

MapOutputTracker能夠把每一個Map輸出的MapStatus註冊到Tracker,同時Tracker也提供了訪問接口,能夠從該Tracker中讀取指定每一個ShuffleID所對應的map輸出的位置;

同時MapOutputTracker也是主從結構,其中Master提供了將Map輸出註冊到Tracker的入口, slave運行在每一個Executor上,提供讀取入口, 可是這個讀取過程須要和Master進行交互,將指定的 ShuffleID所對應的MapStatus信息從Master中fetch過來;

2.1.1. MapOutputTrackerMaster

參見: 提交stage

driver端,記錄shuffle信息

MapStatus數據記錄的格式:{shuffleId,mapId,MapStatus}

每一個Shuffle都對應一個ShuffleID,該ShuffleID下面對應多個MapID,每一個MapID都會輸出一個MapStatus,經過該MapStatus,能夠定位每一個 MapID所對應的ShuffleMapTask運行過程當中所對應的機器

經過shuffleID進行索引,存儲了全部註冊到tracker的Shuffle, 經過registerShuffle能夠進行註冊Shuffle, 經過registerMapOutput能夠在每次ShuffleMapTask結束之後,將Map的輸出註冊到Track中; 同時提供了getSerializedMapOutputStatuses接口 將一個Shuffle全部的MapStatus進行序列化並進行返回;

class MapOutputTrackerMaster{
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
val mapStatuses = new Array[MapStatus](numPartitions)

// 在建立stage時,初始化ShuffleStatus
def registerShuffle(shuffleId: Int, numMaps: Int) {
    shuffleStatuses.put(
		shuffleId,new ShuffleStatus(numMaps)) 
}

	// 將MapTask的輸出註冊到Track中
  def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
    **// {shuffleId,mapId,MapStatus}**
	shuffleStatuses(shuffleId).addMapOutput(mapId, status)
  }

  def addMapOutput(mapId: Int, status: MapStatus): Unit = synchronized {
    mapStatuses(mapId) = status
  }
}

// mapStatus中包含了task運行位置,partitions數量等信息
MapStatus{
def location: BlockManagerId
def getSizeForBlock(reduceId: Int): Long
}

2.1.2. MapOutputTrackerWorker

excutor端獲取shuffle信息,注意:local模式下是直接從trackerMaster獲取信息的(worker和master擁有相同的父類,local模式下直接獲取不用再走RPC調用)

MapOutputTrackerWorker的實現很簡單,核心功能就是getServerStatuses, 它獲取指定Shuffle的每一個reduce所對應的MapStatus信息

class MapOutputWorker{
 def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
​      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
// 根據shuffleId獲取MapStatus集合
val statuses = getStatuses(shuffleId)
// 根據shuffleId和起始分區,從mapStatus獲取響應的blockManager信息
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
}
// 發送消息給trackerMaster,獲取mapOutPut信息
def askTracker{
​	var trackerEndpoint: RpcEndpointRef = _
​	trackerEndpoint.askSync[T](message)
}

2.2. 寫數據

ShuffleMapTask負責寫數據操做,最後會生成.data和.index文件,在執行完畢後返回一個MapStatus對象。

ShuffleMapTask在excutor上獲取到具體的writer後進行實際的寫操做

class ShuffleMapTask extends Task(
def runTask(context: TaskContext): MapStatus = {
	// 反序列化接收到的數據
    val (rdd, dep) = closureSerializer.deserialize(
      ByteBuffer.wrap(taskBinary.value))

	// 調用ShuffleManager的getWriter方法獲取一組writer
 writer = manager.getWriter(dep.shuffleHandle, partitionId, context)
	 // 遍歷RDD進行write
    writer.write()
}
}

2.2.1. writer

img 參見: writer

Get a writer for a given partition. Called on executors by map tasks.

由於Shuffle過程當中須要將Map結果數據輸出到文件,因此須要經過註冊一個ShuffleHandle來獲取到一個ShuffleWriter對象,經過它來控制Map階段記錄數據輸出的行爲。其中,ShuffleHandle包含了以下基本信息:

shuffleId:標識Shuffle過程的惟一ID numMaps:RDD對應的Partitioner指定的Partition的個數,也就是ShuffleMapTask輸出的Partition個數 dependency:RDD對應的依賴ShuffleDependency

class SortShuffleManager{
def getWriter(){
​	handle match {
​      case SerializedShuffleHandle=>
​			new UnsafeShuffleWriter()
​	  case BypassMergeSortShuffleHandle=>
​			new BypassMergeSortShuffleWriter()
​	  case BaseShuffleHandle=>
​			 new SortShuffleWriter()
}
}

2.2.1.1. BypassMergeSortShuffleWriter

  1. 按照hash方式排序
  2. 每一個partition產生一個file,而後將相同task產生的文件進行合併。blocks的偏移量被單獨存放在一個索引文件中
  3. 經過IndexShuffleBlockResolver對寫入的數據進行緩存
  4. 使用場景:
    1. 不用排序
    2. 沒有聚合函數
    3. 分區數量少於設置的閾值 spark.shuffle.sort.bypassMergeThreshold默認值是200

2.2.1.2. UnsafeShuffleWriter

若是ShuffleDependency中的Serializer,容許對將要輸出數據對象進行排序後,再執行序列化寫入到文件,則會選擇建立一個SerializedShuffleHandle,生成一個UnsafeShuffleWriter

2.2.1.3. SortShuffleWriter

除了上面兩種ShuffleHandle之後,其餘狀況都會建立一個BaseShuffleHandle對象,它會以反序列化的格式處理Shuffle輸出數據。

數據記錄格式:

// shuffle_shuffleId_mapId_reducId
shuffle_2901_11825_0.data
shuffle_2901_11825_0.index
2.2.1.3.1. 寫文件
2.2.1.3.1.1. 數據格式

img

數據格式有兩種,若是不須要合併則使用buffer,若是須要合併使用map

class ExternalSorter{
map = new PartitionedAppendOnlyMap[K, C]
buffer = new PartitionedPairBuffer[K, C]
def insertAll{
​	if(shouldCombine){
​		map.changeValue()
​	}else{
​		 buffer.insert()
​	}
}
}

2.2.1.3.1.1.1. map

在Map階段會執行Combine操做,在Map階段進行Combine操做可以下降Map階段數據記錄的總數,從而下降Shuffle過程當中數據的跨網絡拷貝傳輸。這時,RDD對應的ShuffleDependency須要設置一個Aggregator用來執行Combine操做

map是內存數據結構,最重要的是update函數和map的changeValue方法(這裏的map對應的實現類是PartitionedAppendOnlyMap)。update函數所作的工做,其實就是對createCombiner和mergeValue這兩個函數的使用,第一次遇到一個Key調用createCombiner函數處理,非首次遇到同一個Key對應新的Value調用mergeValue函數進行合併處理。map的changeValue方法主要是將Key和Value在map中存儲或者進行修改(對出現的同一個Key的多個Value進行合併,並將合併後的新Value替換舊Value)。 PartitionedAppendOnlyMap是一個通過優化的哈希表,它支持向map中追加數據,以及修改Key對應的Value,可是不支持刪除某個Key及其對應的Value。它可以支持的存儲容量是0.7 * 2 ^ 29 = 375809638。當達到指定存儲容量或者指定限制,就會將map中記錄數據Spill到磁盤文件,這個過程和前面的相似

class ExternalSorter{
map = new PartitionedAppendOnlyMap[K, C]
buffer = new PartitionedPairBuffer[K, C]
def insertAll{
​	if(shouldCombine){
// 定義一個aggtregator函數
val mergeValue = aggregator.get.mergeValue
​      val createCombiner = aggregator.get.createCombiner
​      var kv: Product2[K, V] = null
​      val update = (hadValue: Boolean, oldValue: C) => {
​        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
​      }
// 使用update函數實現對新增元素的合併操做
map.changeValue((getPartition(kv._1), kv._1), update)
​	}else{
​		 buffer.insert()
​	}
}
}

2.2.1.3.1.1.2. buffer

map端不須要排序時使用的數據存儲格式

Map階段不進行Combine操做,在內存中緩存記錄數據會使用PartitionedPairBuffer這種數據結構來緩存、排序記錄數據,它是一個Append-only Buffer,僅支持向Buffer中追加數據鍵值對記錄

  1. buffer大小:默認64,最大2 ^ 30 - 1
2.2.1.3.1.2. spill

組裝完數據後寫磁盤

class ExternalSorter{
map = new PartitionedAppendOnlyMap[K, C]
buffer = new PartitionedPairBuffer[K, C]
def insertAll{
​	if(shouldCombine){
​		maybeSpillCollection(usingMap = true)
​	}else{
​		 maybeSpillCollection(usingMap = false)
​	}
}
}
2.2.1.3.2. 建索引

2.2.2. 寫順序

3. reduce

shuffle read,一般就是一個stage剛開始時要作的事情。此時該stage的每個task就須要將上一個stage的計算結果中的全部相同key,從各個節點上經過網絡都拉取到本身所在的節點上,而後進行key的聚合或鏈接等操做。因爲shuffle write的過程當中,task爲下游stage的每一個task都建立了一個磁盤文件,所以shuffle read的過程當中,每一個task只要從上游stage的全部task所在節點上,拉取屬於本身的那一個磁盤文件便可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每一個shuffle read task都會有一個本身的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,而後經過內存中的一個Map進行聚合等操做。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操做。以此類推,直到最後將全部數據到拉取完,並獲得最終的結果

3.1. 讀數據

3.2. reduce端獲取

參見: ResultTask

調用ShuffleManager經過getReader方法獲取具體的Reader,去讀數據。

class ShuffledRDD {
​	def compute(){
​		 val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
​    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
​      .read()
​      .asInstanceOf[Iterator[(K, C)]]
​	}
}

4. shuffle管理入口

img

ShuffleManager是shuffle整個過程的管理入口,對外提供讀寫等接口。

ShuffleManager在driver中建立,driver用ShuffleManager進行註冊shuffle,執行讀寫操做等

對Shuffle作了什麼優化來提供Spark的性能,本質上就是對ShuffleManager進行優化和提供新的實現

spark2.2.0中已取消對HashShuffleManager的支持 新增了tungsten-sort。

ShuffleManager有兩種實現HashShuffleManager和SorShuffleManager,1.1一會的版本默認是SortShuffleManger,可經過 conf.get("spark.shuffle.manager", "sort") 修改默認的shuffle實現方式

SortShuffleManager和HashShuffleManager有一個本質的差異,即同一個map的多個reduce的數據都寫入到同一個文件中;那麼SortShuffleManager產生的Shuffle 文件個數爲2*Map個數

// shuffleManger提供的功能
private[spark] trait ShuffleManager {
 // shuffle註冊
  def registerShuffle(shuffleId: Int, numMaps: Int,dependency: ShuffleDependency): ShuffleHandle
// shuffle註銷
  def unregisterShuffle(shuffleId: Int): Boolean
// mapTask返回一組Writer
  def getWriter(handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter
// 提供Start分區編號和end分區編號;固然通常狀況若是每一個reduce單獨運行,那麼start-end區間也只對應一個reduce
  def getReader(handle: ShuffleHandle,startPartition: Int,endPartition: Int,context: TaskContext): ShuffleReader

 

  def shuffleBlockManager: ShuffleBlockManager

  def stop(): Unit
}

4.1. HashShuffleManager

spark2.2.0中已取消對HashShuffleManager的支持 (SPARK-14667)。參考:http://lxw1234.com/archives/2016/05/666.htm

HashShuffleManager是Spark最先版本的ShuffleManager,該ShuffleManager的嚴重缺點是會產生太多小文件,特別是reduce個數不少時候,存在很大的性能瓶頸。

最第一版本:ShuffleMapTask個數×reduce個數 後期版本: 併發的ShuffleMapTask的個數爲M xreduce個數

4.2. SortShuffleManager

img

參考:http://shiyanjun.cn/archives/1655.html

img

4.2.1. reader

4.2.1.1. BlockStoreShuffleReader

根據partition的起止位置,從別的節點獲取blockURL,node信息組成reader,

class BlockStoreShuffleReader[K, C](
    handle: BaseShuffleHandle[K, _, C],
    startPartition: Int,
    endPartition: Int,
    context: TaskContext,
    serializerManager: SerializerManager = 		SparkEnv.get.serializerManager,
    blockManager: BlockManager = 		SparkEnv.get.blockManager,
    mapOutputTracker: MapOutputTracker = 		SparkEnv.get.mapOutputTracker)
  extends ShuffleReader[K, C] with Logging {

	var blocksByAddress=
	mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)
	
	// 根據獲取到的block信息,給trackerMaster發送消息,獲取RDD數據
	new ShuffleBlockFetcherIterator(
		blocksByAddress
	)
}

class MapOutputTracker{
// excutor在計算ShuffleRDD時調用,返回{blocak地址,Seq{blockID,和輸出數據大小}}等
	def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
      : Seq[(BlockManagerId, Seq[(BlockId, Long)])]

}

class ShuffleBlockFetcherIterator{
	val results = 
		new LinkedBlockingQueue[FetchResult]
	// 負責發送請求和接收數據
	def sendRequest(req: FetchRequest){
		
	// 將接收到的數據放入到隊列中
	results.put(new SuccessFetchResult(
	  blockId: BlockId,
      address: BlockManagerId,
      size: Long,
      **buf: ManagedBuffer,**
      isNetworkReqDone: Boolean
	))
	}
}

4.2.2. writer

參見: writer

Shuffle過程當中須要將Map結果數據輸出到文件,因此須要經過註冊一個ShuffleHandle來獲取到一個ShuffleWriter對象,經過它來控制Map階段記錄數據輸出的行爲

sparkCore源碼解析系列:

  1. sparkCore源碼解析之block
  2. sparkCore源碼解析之partition
  3. sparkCore源碼解析之Job
  4. sparkCore源碼解析之shuffle
  5. sparkCore源碼解析之完整腦圖地址
相關文章
相關標籤/搜索