本篇文章主要剖析broadcast 的實現機制。node
BroadcastManager初始化方法源碼以下:算法
TorrentBroadcastFactory的繼承關係以下:apache
An interface for all the broadcast implementations in Spark (to allow multiple broadcast implementations). SparkContext uses a BroadcastFactory implementation to instantiate a particular broadcast for the entire Spark job.編程
即它是Spark中broadcast中全部實現的接口。SparkContext使用BroadcastFactory實現來爲整個Spark job實例化特定的broadcast。它有惟一子類 -- TorrentBroadcastFactory。數組
它有兩個比較重要的方法:緩存
newBroadcast 方法負責建立一個broadcast變量。session
其主要方法以下:app
newBroadcast其實例化TorrentBroadcast類。dom
unbroadcast方法調用了TorrentBroadcast 類的 unpersist方法。ide
官方說明以下:
A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. Broadcast variables are created from a variable v by calling org.apache.spark.SparkContext.broadcast. The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method.
The interpreter session below shows this:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
即廣播變量容許編程者將一個只讀變量緩存到每個機器上,而不是隨任務一塊兒發送它的副本。它們能夠被用來用一種高效的方式拷貝輸入的大數據集。Spark也嘗試使用高效的廣播算法來減小交互代價。它經過調用SparkContext的broadcast 方法建立,broadcast變量是對真實變量的包裝,它能夠經過broadcast對象的value方法返回真實對象。一旦真實對象被廣播了,要確保對象不會被改變,以確保該數據在全部節點上都是一致的。
TorrentBroadcast繼承關係以下:
TorrentBroadcast 是 Broadcast 的惟一子類。
其說明以下:
A BitTorrent-like implementation of org.apache.spark.broadcast.Broadcast.
The mechanism is as follows:
The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.
On each executor, the executor first attempts to fetch the object from its BlockManager.
If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available.
Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from.
This prevents the driver from being the bottleneck in sending out multiple copies of the broadcast data (one per executor).
When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
實現機制:
driver 將數據拆分紅多個小的chunk並將這些小的chunk保存在driver的BlockManager中。在每個executor節點上,executor首先先從它本身的blockmanager獲取數據,若是不存在,它使用遠程抓取,從driver或者是其餘的executor中抓取數據。一旦它獲取到chunk,就將其放入到本身的BlockManager中,準備被其餘的節點請求獲取。這使得driver發送多個副本到多個executor節點的瓶頸不復存在。
廣播數據的保存有兩種形式:
1. 數據保存在memstore中一份,須要反序列化後存入;保存在磁盤中一份,磁盤中的那一份先使用 SerializerManager序列化爲字節數組,而後保存到磁盤中。
2. 將對象根據blockSize(默認爲4m,能夠經過spark.broadcast.blockSize 參數指定),compressCodec(默認是啓用的,能夠經過 spark.broadcast.compress參數禁用。壓縮算法默認是lz4,能夠經過 spark.io.compression.codec 參數指定)將數據寫入到outputStream中,進而拆分爲幾個小的chunk,最終將數據持久化到blockManager中,也是memstore一份,不須要反序列化;磁盤一份。
其中,TorrentBroadcast 的 blockifyObject 方法以下:
壓縮的Outputstream對 ChunkedByteBufferOutputStream 作了裝飾。
broadcast 方法調用 value 方法時, 會調用 TorrentBroadcast 的 getValue 方法,以下:
_value 字段聲明以下:
private lazy val _value: T = readBroadcastBlock()
接下來看一下 readBroadcastBlock 這個方法:
1 private def readBroadcastBlock(): T = Utils.tryOrIOException { 2 TorrentBroadcast.synchronized { 3 val broadcastCache = SparkEnv.get.broadcastManager.cachedValues 4 5 Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { 6 setConf(SparkEnv.get.conf) 7 val blockManager = SparkEnv.get.blockManager 8 blockManager.getLocalValues(broadcastId) match { 9 case Some(blockResult) => 10 if (blockResult.data.hasNext) { 11 val x = blockResult.data.next().asInstanceOf[T] 12 releaseLock(broadcastId) 13 14 if (x != null) { 15 broadcastCache.put(broadcastId, x) 16 } 17 18 x 19 } else { 20 throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") 21 } 22 case None => 23 logInfo("Started reading broadcast variable " + id) 24 val startTimeMs = System.currentTimeMillis() 25 val blocks = readBlocks() 26 logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) 27 28 try { 29 val obj = TorrentBroadcast.unBlockifyObject[T]( 30 blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) 31 // Store the merged copy in BlockManager so other tasks on this executor don't 32 // need to re-fetch it. 33 val storageLevel = StorageLevel.MEMORY_AND_DISK 34 if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { 35 throw new SparkException(s"Failed to store $broadcastId in BlockManager") 36 } 37 38 if (obj != null) { 39 broadcastCache.put(broadcastId, obj) 40 } 41 42 obj 43 } finally { 44 blocks.foreach(_.dispose()) 45 } 46 } 47 } 48 } 49 }
對源碼做以下解釋:
第3行:broadcastManager.cachedValues 保存着全部的 broadcast 的值,它是一個Map結構的,key是強引用,value是虛引用(在垃圾回收時會被清理掉)。
第4行:根據 broadcastId 從cachedValues 中取數據。若是沒有,則執行getOrElse裏的 default 方法。
第8行:從BlockManager的本地獲取broadcast的值(從memstore或diskstore中,獲取的數據是完整的數據,不是切分以後的小chunk),如有,則釋放BlockManager的鎖,並將獲取的值存入cachedValues中;若沒有,則調用readBlocks將chunk 數據讀取到並將數據轉換爲 broadcast 的value對象,並將該對象放入cachedValues中。
其中, readBlocks 方法以下:
1 /** Fetch torrent blocks from the driver and/or other executors. */ 2 private def readBlocks(): Array[BlockData] = { 3 // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported 4 // to the driver, so other executors can pull these chunks from this executor as well. 5 val blocks = new Array[BlockData](numBlocks) 6 val bm = SparkEnv.get.blockManager 7 8 for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { 9 val pieceId = BroadcastBlockId(id, "piece" + pid) 10 logDebug(s"Reading piece $pieceId of $broadcastId") 11 // First try getLocalBytes because there is a chance that previous attempts to fetch the 12 // broadcast blocks have already fetched some of the blocks. In that case, some blocks 13 // would be available locally (on this executor). 14 bm.getLocalBytes(pieceId) match { 15 case Some(block) => 16 blocks(pid) = block 17 releaseLock(pieceId) 18 case None => 19 bm.getRemoteBytes(pieceId) match { 20 case Some(b) => 21 if (checksumEnabled) { 22 val sum = calcChecksum(b.chunks(0)) 23 if (sum != checksums(pid)) { 24 throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" + 25 s" $sum != ${checksums(pid)}") 26 } 27 } 28 // We found the block from remote executors/driver's BlockManager, so put the block 29 // in this executor's BlockManager. 30 if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) { 31 throw new SparkException( 32 s"Failed to store $pieceId of $broadcastId in local BlockManager") 33 } 34 blocks(pid) = new ByteBufferBlockData(b, true) 35 case None => 36 throw new SparkException(s"Failed to get $pieceId of $broadcastId") 37 } 38 } 39 } 40 blocks 41 }
源碼解釋以下:
第14行:根據pieceid從本地BlockManager 中獲取到 chunk
第15行:若是獲取到了chunk,則釋放鎖。
第18行:若是沒有獲取到chunk,則從遠程根據pieceid獲取遠程獲取chunk,獲取到chunk後作checksum校驗,以後將chunk存入到本地BlockManager中。
注:本篇文章沒有對BroadcastManager中關於BlockManager的操做作進一步更詳細的說明,下一篇文章會專門剖析Spark的存儲體系。