本系列文章源自JerryLead的SparkInternals,本文只是在做者的原文基礎上加入本身的理解,批註,和部分源碼,做爲學習之用
注:原文是基於Spark 1.0.2 , 而本篇筆記是基於spark 2.2.0, 對比後發現核心部分變化不大,依舊值得參考node
顧名思義,broadcast 就是將數據從一個節點發送到其餘各個節點上去。這樣的場景不少,好比 driver 上有一張表,其餘節點上運行的 task 須要 lookup 這張表,那麼 driver 能夠先把這張表 copy 到這些節點,這樣 task 就能夠在本地查表了。如何實現一個可靠高效的 broadcast 機制是一個有挑戰性的問題。先看看 Spark 官網上的一段話:git
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.github
這就涉及一致性的問題,若是變量能夠被更新,那麼一旦變量被某個節點更新,其餘節點要不要一塊更新?若是多個節點同時在更新,更新順序是什麼?怎麼作同步?還會涉及 fault-tolerance 的問題。爲了不維護數據一致性問題,Spark 目前只支持 broadcast 只讀變量。spring
由於每一個 task 是一個線程,並且同在一個進程運行 tasks 都屬於同一個 application。所以每一個節點(executor)上放一份就能夠被全部 task 共享。數組
driver program 例子:緩存
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)
val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
複製代碼
driver 使用 sc.broadcast()
聲明要 broadcast 的 data,bdata 的類型是 Broadcast。bash
當 rdd.transformation(func)
須要用 bdata 時,直接在 func 中調用,好比上面的例子中的 map() 就使用了 bdata.value.size。服務器
broadcast 的實現機制頗有意思:markdown
Driver 先建一個本地文件夾用以存放須要 broadcast 的 data,並啓動一個能夠訪問該文件夾的 HttpServer。當調用val bdata = sc.broadcast(data)
時就把 data 寫入文件夾,同時寫入 driver 本身的 blockManger 中(StorageLevel 爲內存+磁盤),得到一個 blockId,類型爲 BroadcastBlockId。網絡
//initialize
sparkSession.build()#env.broadcastManager.initialize()
new TorrentBroadcastFactory.initialize()
//use broadcast
sc.broadcast()
broadcastManager.newBroadcast()
//Divide the object into multiple blocks and put those blocks in the block manager.
new TorrentBroadcast[T](value_, nextBroadcastId.getAndIncrement()).writeBlocks()
//保存一份到driver上
SparkEnv.get.blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)
doPutIterator()#memoryStore.putIteratorAsValues()#diskStore.put(blockId)
//以4m分別保存block("spark.broadcast.blockSize", "4m"),並獲得meta
block MetaDatas = TorrentBroadcast.blockifyObject(value, blockSize..)
foreach block MetaData :
blockManager.putBytes(BroadcastBlockId, MEMORY_AND_DISK_SER...)
doPutBytes()#memoryStore.putIteratorAsValues()#diskStore.putBytes()
//異步複製數據,sc.broadcast()應該只會在driver端保留一份數據,replication=1,後面executorfetch數據時才慢慢增長broadcast的副本數量
if level.replication > 1 :ThreadUtils.awaitReady(replicate(ByteBufferBlockData(bytes, false)...)
//複製副本規則,做爲參考
blockManager.replicate()
//請求得到其餘BlockManager的id
val initialPeers = getPeers(false)
blockManagerMaster.getPeers(blockManagerId).sortBy(_.hashCode)
//從driver上獲取其餘節點
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
//BlockManagerMasterEndpoint中返回非driver和非當前節點的blockManagerId
blockManagerInfo.keySet.contains(blockManagerId)#blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
foreach block replicate replication-1 nodes: blockTransferService.uploadBlockSync()
//後面就是發送信息給blockManager,再保存數據通知driver
blockManager.putBytes()#reportBlockStatus(blockId, putBlockStatus)
blockManagerMasterEndpoint.updateBlockInfo() //driver端更新信息
複製代碼
當調用rdd.transformation(func)
時,若是 func 用到了 bdata,那麼 driver submitTask() 的時候會將 bdata 一同 func 進行序列化獲得 serialized task,注意序列化的時候不會序列化 bdata 中包含的 data。
//TorrentBroadcast.scala 序列化的時候不會序列化 bdata 中包含的 data
// @transient代表不序列化_value
@transient private lazy val _value: T = readBroadcastBlock()
/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}
複製代碼
上一章講到 serialized task 從 driverEndPoint 傳遞到 executor 時使用 RPC 的傳消息機制,消息不能太大,而實際的 data 可能很大,因此這時候還不能 broadcast data。
driver 爲何會同時將 data 放到磁盤和 blockManager 裏面?放到磁盤是爲了讓 HttpServer 訪問到,放到 blockManager 是爲了讓 driver program 自身使用 bdata 時方便(其實我以爲不放到 blockManger 裏面也行)。
那麼何時傳送真正的 data?在 executor 反序列化 task 的時候,會同時反序列化 task 中的 bdata 對象,這時候會調用 bdata 的 readObject() 方法。該方法先去本地 blockManager 那裏詢問 bdata 的 data 在不在 blockManager 裏面,若是不在就使用下面的兩種 fetch 方式之一去將 data fetch 過來。獲得 data 後,將其存放到 blockManager 裏面,這樣後面運行的 task 若是須要 bdata 就不須要再去 fetch data 了。若是在,就直接拿來用了。
//runjob()
dagScheduler.submitMissingTasks(stage: Stage, jobId: Int)
val taskIdToLocations = getPreferredLocs(stage.rdd, id)-----
getCacheLocs()//從本地或者driver獲取緩存rdd位置
rdd.preferredLocations()//也會從checkpointrdd中尋找
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage => //把func也序列化了,func裏面包含broadcast變量
//不會序列化 broadcast變量 中包含的 data
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
taskBinary = sc.broadcast(taskBinaryBytes)//廣播task
taskScheduler.submitTasks(new TaskSet(...))
...
複製代碼
//TorrentBroadcast.scala
//使用lazy方式,真正反序列化使用_value才調用方法讀值
@transient private lazy val _value: T = readBroadcastBlock()
TorrentBroadcast.readBroadcastBlock()
blockManager.getLocalValues()//本地讀取
memoryStore.getValues(blockId)#diskStore.getBytes(blockId)
readBlocks() //本地無則從driver/其餘executor讀取
foreach block :
blockManager.getRemoteBytes(BroadcastBlockId(id, "piece" + pid))
blockManager.putBytes()//保存在本地
//整個broadcast保存在本地
blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)
blocks.foreach(_.dispose()) //去重,把以前分開保存的block刪除
複製代碼
下面探討 broadcast data 時候的兩種實現方式:
spark 2.2 的Broadcast package中已經去除了HttpBroadcast,只留下了TorrentBroadcast
顧名思義,HttpBroadcast 就是每一個 executor 經過的 http 協議鏈接 driver 並從 driver 那裏 fetch data。
Driver 先準備好要 broadcast 的 data,調用sc.broadcast(data)
後會調用工廠方法創建一個 HttpBroadcast 對象。該對象作的第一件事就是將 data 存到 driver 的 blockManager 裏面,StorageLevel 爲內存+磁盤,blockId 類型爲 BroadcastBlockId。
同時 driver 也會將 broadcast 的 data 寫到本地磁盤,例如寫入後獲得 /var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0
, 這個文件夾做爲 HttpServer 的文件目錄。
Driver 和 executor 啓動的時候,都會生成 broadcastManager 對象,調用 HttpBroadcast.initialize(),driver 會在本地創建一個臨時目錄用來存放 broadcast 的 data,並啓動能夠訪問該目錄的 httpServer。
Fetch data:在 executor 反序列化 task 的時候,會同時反序列化 task 中的 bdata 對象,這時候會調用 bdata 的 readObject() 方法。該方法先去本地 blockManager 那裏詢問 bdata 的 data 在不在 blockManager 裏面,若是不在就使用 http 協議鏈接 driver 上的 httpServer,將 data fetch 過來。獲得 data 後,將其存放到 blockManager 裏面,這樣後面運行的 task 若是須要 bdata 就不須要再去 fetch data 了。若是在,就直接拿來用了。
HttpBroadcast 最大的問題就是 driver 所在的節點可能會出現網絡擁堵,由於 worker 上的 executor 都會去 driver 那裏 fetch 數據。
爲了解決 HttpBroadast 中 driver 單點網絡瓶頸的問題,Spark 又設計了一種 broadcast 的方法稱爲 TorrentBroadcast,這個相似於你們經常使用的 BitTorrent 技術。基本思想就是將 data 分塊成 data blocks,而後假設有 executor fetch 到了一些 data blocks,那麼這個 executor 就能夠被看成 data server 了,隨着 fetch 的 executor 愈來愈多,有更多的 data server 加入,data 就很快能傳播到所有的 executor 那裏去了。
HttpBroadcast 是經過傳統的 http 協議和 httpServer 去傳 data,在 TorrentBroadcast 裏面使用在上一章介紹的 blockManager.getRemoteValues() => NIO ShuffleClient 傳數據的方法來傳遞,讀取數據的過程與讀取 cached rdd 的方式相似,能夠參閱 CacheAndCheckpoint 中的最後一張圖。
下面討論 TorrentBroadcast 的一些細節:
Driver 先把 data 序列化到 byteArray,而後切割成 BLOCK_SIZE(由 spark.broadcast.blockSize = 4MB
設置)大小的 data block,每一個 data block 被 TorrentBlock 對象持有。切割完 byteArray 後,會將其回收,所以內存消耗雖然能夠達到 2 * Size(data),但這是暫時的。
完成分塊切割後,就將分塊信息(稱爲 meta 信息)存放到 driver 本身的 blockManager 裏面,StorageLevel 爲內存+磁盤,同時會通知 driver 本身的 blockManagerMaster 說 meta 信息已經存放好。通知 blockManagerMaster 這一步很重要,由於 blockManagerMaster 能夠被 driver 和全部 executor 訪問到,信息被存放到 blockManagerMaster 就變成了全局信息。
以後將每一個分塊 data block 存放到 driver 的 blockManager 裏面,StorageLevel 爲內存+磁盤。存放後仍然通知 blockManagerMaster 說 blocks 已經存放好。到這一步,driver 的任務已經完成。
executor 收到 serialized task 後,先反序列化 task,這時候會反序列化 serialized task 中包含的 bdata 類型是 TorrentBroadcast,也就是去訪問 TorrentBroadcast._value,調用其readBroadcastBlock()
方法。這個方法首先獲得 bdata 對象,**而後發現 bdata 裏面沒有包含實際的 data。怎麼辦?**先詢問本地所在的 executor 裏的 blockManager 是會否包含 data(經過查詢 data 的 broadcastId),包含就直接從本地 blockManager 讀取 data。不然,就經過本地 blockManager 去鏈接 driver 的 blockManagerMaster 獲取 data 分塊的 meta 信息,獲取信息後,就開始了 BT 過程。
**BT 過程:**task 先在本地開一個數組用於存放將要 fetch 過來的 data blocks val blocks = new Array[BlockData](numBlocks)
,而後打亂要 fetch 的 data blocks 的順序,for (pid <- Random.shuffle(Seq.range(0, numBlocks)))
好比若是 data block 共有 5 個,那麼打亂後的 fetch 順序多是 3-1-2-4-5。而後按照打亂後的順序去 fetch 一個個 data block。**每 fetch 到一個 block 就將其存放到 executor 的 blockManager 裏面,同時通知 driver 上的 blockManagerMaster 說該 data block 多了一個存儲地址。**這一步通知很是重要,意味着 blockManagerMaster 知道 data block 如今在 cluster 中有多份,下一個不一樣節點上的 task 再去 fetch 這個 data block 的時候,能夠有兩個選擇了,並且會隨機選擇一個去 fetch。這個過程持續下去就是 BT 協議,隨着下載的客戶端愈來愈多,data block 服務器也愈來愈多,就變成 p2p下載了。關於 BT 協議,Wikipedia 上有一個動畫。
整個 fetch 過程結束後,task 會開一個大 Array[Byte],大小爲 data 的總大小,而後將 data block 都 copy 到這個 Array,而後對 Array 中 bytes 進行反序列化獲得原始的 data,這個過程就是 driver 序列化 data 的反過程。
最後將 data 存放到 task 所在 executor 的 blockManager 裏面,StorageLevel 爲內存+磁盤。顯然,這時候 data 在 blockManager 裏存了兩份,不過等所有 executor 都 fetch 結束,存儲 data blocks 那份能夠刪掉了。
@Andrew-Xia 回答道:不會怎樣,就是這個rdd在每一個executor中實例化一份。
公共數據的 broadcast 是很實用的功能,在 Hadoop 中使用 DistributedCache,好比經常使用的-libjars
就是使用 DistributedCache 來將 task 依賴的 jars 分發到每一個 task 的工做目錄。不過度發前 DistributedCache 要先將文件上傳到 HDFS。這種方式的主要問題是資源浪費,若是某個節點上要運行來自同一 job 的 4 個 mapper,那麼公共數據會在該節點上存在 4 份(每一個 task 的工做目錄會有一份)。可是經過 HDFS 進行 broadcast 的好處在於單點瓶頸不明顯,由於公共 data 首先被分紅多個 block,而後不一樣的 block 存放在不一樣的節點。這樣,只要全部的 task 不是同時去同一個節點 fetch 同一個 block,網絡擁塞不會很嚴重。
對於 Spark 來說,broadcast 時考慮的不只是如何將公共 data 分發下去的問題,還要考慮如何讓同一節點上的 task 共享 data。
對於第一個問題,Spark 設計了兩種 broadcast 的方式,傳統存在單點瓶頸問題的 HttpBroadcast,和相似 BT 方式的 TorrentBroadcast。HttpBroadcast 使用傳統的 client-server 形式的 HttpServer 來傳遞真正的 data,而 TorrentBroadcast 使用 blockManager 自帶的 NIO 通訊方式來傳遞 data。TorrentBroadcast 存在的問題是慢啓動和佔內存,慢啓動指的是剛開始 data 只在 driver 上有,要等 executors fetch 不少輪 data block 後,data server 纔會變得可觀,後面的 fetch 速度纔會變快。executor 所佔內存的在 fetch 完 data blocks 後進行反序列化時須要將近兩倍 data size 的內存消耗。無論哪種方式,driver 在分塊時會有兩倍 data size 的內存消耗。
對於第二個問題,每一個 executor 都包含一個 blockManager 用來管理存放在 executor 裏的數據,將公共數據存放在 blockManager 中(StorageLevel 爲內存+磁盤),能夠保證在 executor 執行的 tasks 可以共享 data。
其實 Spark 以前還嘗試了一種稱爲 TreeBroadcast 的機制,詳情能夠見技術報告 Performance and Scalability of Broadcast in Spark。
更深刻點,broadcast 能夠用多播協議來作,不過多播使用 UDP,不是可靠的,仍然須要應用層的設計一些可靠性保障機制。