Spark2.x(六十二):(Spark2.4)共享變量 - Broadcast原理分析

以前對Broadcast有分析,可是不夠深刻《Spark2.3(四十三):Spark Broadcast總結》,本章對其實現過程以及原理進行分析。html

帶着如下幾個問題去寫本篇文章:java

1)driver端如何實現broadcast的裝備,是否會把broadcast數據發送給executor端?node

2)executor如何獲取到broadcast數據?git

導入

Spark一個很是重要的特徵就是共享變量。共享變量分爲廣播變量(broadcast variable)和累加器(Accumulators)。程序員

  • 廣播變量能夠在driver程序中寫入,在executor端讀取。
  • 累加器在executors中寫入,而在驅動程序(driver端)讀取。

但本章只講解broadcast變量github

Spark官網「共享變量簡介請參考:算法

http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variablesapache

一般,當傳遞給Spark算子(好比map或reduce)函數在遠程集羣節點上執行時,它在函數中使用的全部變量的單獨副本上工做。這些變量被複制到每臺服務器上,對遠程服務器上變量的任何更新都不會傳播回driver程序。一般支持跨Tasks的讀寫共享變量性能比較低。也就是說若是在一個算子函數中使用到了某個外部的變量,那麼這個變量的值會被拷貝到每一個task中。此時每一個task只能操做本身的那份變量副本。若是多個task想要共享某個變量,那麼這種方式是作不到的。緩存

然而,Spark確實爲兩種常見的使用模型提供了兩種有限類型的共享變量:廣播變量和累加器。安全

廣播變量的概念

廣播變量提供了一種方法,能夠在driver端獲取本地數據,並將它的只讀副本分發給運行有executor(與driver一塊兒構成一個應用程序的executor,不是另一個應用程序的executor)的節點服務器,而不是爲每一個Task發送新副本。廣播變量容許程序員在每臺計算機上緩存只讀變量,而不是將其副本與Tasks一塊兒發送。例如,它們能夠有效地爲每一個節點提供一個大型輸入數據集的副本。Spark還嘗試使用有效的廣播算法來分發廣播變量,以下降通訊成本。Spark將值傳遞給Executor(一次),當在Executor中屢次使用廣播變量時,Task能夠共享它而不會致使重複的網絡傳輸。

Spark應用程序做業的執行由一系列Stage構成,由分佈式「shuffle」操做分隔。Spark自動廣播每一個Stage中Tasks所需的公共數據。以這種方式廣播的數據以序列化形式緩存,並在運行每一個Task以前進行反序列化。這意味着,僅當跨多個Stage的Tasks須要相同的數據或以反序列化形式緩存數據時,顯式建立廣播變量纔有用。

廣播變量是經過調用sparkContext.broadcast(v)從變量v建立的。廣播變量是圍繞v的包裝器,它的值能夠經過調用value方法來訪問。使用示例:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

建立廣播變量後,應在集羣上運行的任何函數中使用它,而不是使用值v,這樣V就不會屢次發送到節點。此外,對象v在廣播後不該進行修改,以確保全部節點都得到廣播變量的相同值(例如,若是變量稍後被髮送到新節點)。

使用廣播變量的場景示例:

  • 大表join小表時,將小表廣播掉,以(broadcast join)方式運行,實現優化。
  • 廣播機器學習模型以便可以對咱們的數據進行預測

經過SparkContext#broadcast(v)來建立廣播變量,返回的廣播變量其實就是一個實現了Broadcast接口的包裝類(TorrentBroadcast)對象。在使用時,須要在executor上運行的算子(map/reduce等)內部會引用該broadcast包裝的廣播變量。

另外建立後在driver端修改廣播變量中的值,而不作其餘操做時,executor是沒法拿到driver端修改後的數據的。若是要修改建立後的廣播變量,請參考《Spark2.3(四十二):Spark Streaming和Spark Structured Streaming更新broadcast總結(二)》。

廣播替代方案

其實咱們能夠在閉包中捕獲局部變量,以便將數據從driver程序傳輸到worker,但這種方式是每一個Task都發送一個副本變量,相對於broadcast方式比較耗費網絡帶寬和內存空間。

另外咱們也能夠在Executor端採用單例的方式去加載資源文件,來替代broadcast方案,可是這種方案與broadcast仍是有區別:

  1. broadcast不涉及到當executor多個vcore時,並行task之間同時加載單例資源數據線程安全問題(固然單利中能夠經過synchronized同步實現線程安全,首次加載並行線程排隊問題會涉及到效率問題);
  2. broadcast比較完美狀況下,在executor的節點服務器上只有一個executor從driver端經過remote方式讀取副本數據,該節點服務器上的其餘executor從local讀取資源,讀取不到時纔會remote讀取資源;但單例方式每一個executor都須要加載一次資源;
  3. 存儲結構不一樣,broadcast是在spark2.0+內存結構中存儲在Storage Memory這塊內存區上,而單例方式的數據是存放到「用戶自定義數據結構」Other內存區上。

使用時注意事項

1)broadcast的定義必須在Driver端,不能再executor端定義;

2)調用unpersist(),unpersist(boolean blocking),destroy(),distroy(boolean blocking)方法這些方法必須在driver端調用。

3)在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。

具體操做步驟:
  a.在driver端調用unpersist(true)方法;
  b.對該broadcast實例對象進行從新賦值。
參考《Spark2.2(三十三):Spark Streaming和Spark Structured Streaming更新broadcast總結(一)》、《Spark2.3(四十二):Spark Streaming和Spark Structured Streaming更新broadcast總結(二)

4)能不能將一個RDD使用廣播變量廣播出去?
   不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。

5)若是executor端用到了Driver的變量,若是不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。

6)若是Executor端用到了Driver的變量,若是使用廣播變量在每一個Executor中只有一份Driver端的變量副本。

7)使用時,算子函數內部引用的是broadcast包裝類對象,而不是broadcast.getValue()的值。不然,毫無心義,每一個task都會存在一份副本。

Broadcast的實現

首先咱們咱們看下broadcast的類結構:

SparkContext類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/SparkContext.scala,是broadcast使用的入口函數。

Broadcast接口類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala,是broadcast包裝類的接口類。

BroadcastFactory接口類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala,是broadcast包裝的工廠類接口。

TorrentBroadcast類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala,是broadcast包裝類的惟一實現類。

TorrentBroadcastFactory類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala,是broadcast包裝的工廠類接口的惟一實現類。

BroadcastManager類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala,broadcast管理類,在SparkEnv中引用,SparkContext#broadcast中調用就是SparkContext#env#broadcastManager的。

BlockManager類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala,blockManager是env中引用對block的管理類,它會把數據根據存儲級別來存儲數據。若是是memory存儲則調用memoryStore來實現存儲,若是是磁盤存儲則使用diskStore來實現存儲。

MemoryStore類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala,內存存儲實現類。讀、寫操做。

DiskStore類結構:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/storage/DiskStore.scala,磁盤存儲實現類。讀、寫操做。

SparkContext中broadcast(v)函數

用來建立broadcast variable的broadcasat(v)函數定義在SparkContext中,SparkContext中broadcas(v)t代碼以下:

   /**
   * Broadcast a read-only variable to the cluster, returning a
   * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
   * The variable will be sent to each cluster only once.
   *
   * @param value value to broadcast to the Spark nodes
   * @return `Broadcast` object, a read-only variable cached on each machine
   */
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

調用了SparkContext#broadcast(v)會建立一個廣播變量,並返回一個org.apache.spark.broadcast.Broadcast對象,這樣就能夠分佈式函數中來讀取該廣播變量的值。該變量會被髮送到各個存在executor的節點上。

Broadcast()函數的實現流程以下:

1)判斷須要廣播的變量是不是分佈式變量,如果則會終止函數,報告"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.」的錯誤。

2)經過BroadcastManger#newBroadcast函數來建立廣播變量,並返回一個Broadcast對象,Broadcast只是一個接口類,真正返回的是TorrentBroadcast類的對象。

3)註冊broadcast的cleanup函數,實際上至關於註冊到ContextCleaner中,後邊ContextCleaner內部有一個線程會默認每30分鐘觸發一次對弱引用的broadcast等的清理工做。

4)返回新建立的實現了Broadcast接口的類對象(或者說,返回broadcast包裝類對象)。

BroadcastManager類

BroadcastManager在SparkContext#broadcast(v)中被使用到,所以咱們沿着這個路線,來看下這個BroadcastManager的實現類。它是一個broadcast的管理類,用來統一建立broadcast對外的接口:建立,銷燬。

BroadcastManager的類定義以下:

private[spark] class BroadcastManager(
    val isDriver: Boolean,
    conf: SparkConf,
    securityManager: SecurityManager)
  extends Logging {

  private var initialized = false
  private var broadcastFactory: BroadcastFactory = null

  initialize()

  // Called by SparkContext or Executor before using Broadcast
  private def initialize() {
    synchronized {
      if (!initialized) {
        broadcastFactory = new TorrentBroadcastFactory
        broadcastFactory.initialize(isDriver, conf, securityManager)
        initialized = true
      }
    }
  }

  def stop() {
    broadcastFactory.stop()
  }

  private val nextBroadcastId = new AtomicLong(0)

  private[broadcast] val cachedValues = {
    new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
  }

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  }
}

該類的構造函數流程以下:

 // 是否已經初始
 private var initialized = false
 private var broadcastFactory: BroadcastFactory = null
 
 initialize()
 
 // 生成廣播變量的id,該id是惟一的,這裏先初始化,會在建立broadcast變量時進行自增操做
 private val nextBroadcastId = new AtomicLong(0)

1)定義了兩個proviate的變量:initialized,nextBroadcastId。其中nextBroadcastId爲每一個廣播變量生成一個惟一的id,在建立broadcast變量時,會經過nextBroadcastId,getAndIncrement()進行自增,並調用initialize()函數來進行初始化。

2)initialize()函數的實現邏輯以下:

  // Called by SparkContext or Executor before using Broadcast
  private def initialize() {
    synchronized {  // 加鎖
      if (!initialized) {
        // 初始化broadcastFactory變量,這裏建立了TorrentBroadcastFactory對象
        broadcastFactory = new TorrentBroadcastFactory
        // 調用TorrentBroadcastFactory的initialize函數來初始化
        broadcastFactory.initialize(isDriver, conf, securityManager)
        // 把initialized設置爲true,同一個對象只初始化一次
        initialized = true
      }
    }
  }

2.1)初始化broadcastFactory變量,這裏建立了TorrentBroadcastFactory對象;

2.2)調用TorrentBroadcastFactory#initialize()進行初始化,能夠查閱TorrentBroadcastFactory,會發現這個initialize()方法實現爲空,什麼也不作;

2.3)把initialize變量設置爲true,並結合synchronized實現broadcastFactory只能被實例化一次。

BroadcastManager中定義了unbroadcast()方法

實現broadcast清理功能:刪除executor上與此TorrentBroadcast關聯的全部持久化塊。若是removeFromDriver爲true,也要刪除driver程序上的這些持久block。

  1. id: Long, 這裏指broadcatId
  2. removeFromDriver: Boolean, 是否從driver端移除broadcast持久化block數據,不然driver端會殘留broadcast持久化block數據。
  3. blocking: Boolean,是否同步等待完成才返回。若是false,則會異步的刪除executor上broadcast副本數據。

BroadcastManager中定義了newBroadcast()方法

該方法是SparkConetxt#broadcast()中調用用來建立broadcast對象使用的,那麼咱們看下其具體實現:

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

1)根據上邊分析,在BroadcastManager初始化時,broadcastFactory給賦值爲TorrentBroadcastFactory,那麼這個broadcastFactory其實就是TorrentBroadcastFactory的實例對象;

2)分析到這裏,咱們能夠清楚的知道真正建立broadcast包裝類的實現是在TorrentBroadcastFactory#newBroadcast()實現的,那麼接下來咱們來查閱TorrentBroadcastFactory的實現。

TorrentBroadcastFactory類

該類實現了一個相似於BitTorrent的協議,經過該協議把廣播數據分發到各個executor中。這些操做實際上是在類TorrentBroadcast中實現的。

比特流(BitTorrent)是一種內容分發協議,由布拉姆·科恩自主開發(2003年,軟件工程師Bram Cohen發明了BitTorrent協議)。它採用高效的軟件分發系統和點對點技術共享大致積文件(如一部電影或電視節目),並使每一個用戶像網絡從新分配結點那樣提供上傳服務。通常的下載服務器爲每個發出下載請求的用戶提供下載服務,而BitTorrent的工做方式與之不一樣。分配器或文件的持有者將文件發送給其中一名用戶,再由這名用戶轉發給其它用戶,用戶之間相互轉發本身所擁有的文件部分,直到每一個用戶的下載都所有完成。這種方法可使下載服務器同時處理多個大致積文件的下載請求,而無須佔用大量帶寬。

TorrentBroadcastFactory的代碼定義以下:

/**
 * A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
 * protocol to do a distributed transfer of the broadcasted data to the executors. Refer to
 * [[org.apache.spark.broadcast.TorrentBroadcast]] for more details.
 */
private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

  override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }

  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }

  override def stop() { }

  /**
   * Remove all persisted state associated with the torrent broadcast with the given ID.
   * @param removeFromDriver Whether to remove state from the driver.
   * @param blocking Whether to block until unbroadcasted
   */
  override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
  }
}

TorrentBroadcast類

TorrentBroadcasat是真正廣播變量操做實現類

該類實現機制:

1)driver端將序列化對象分紅多個小block,並將這些block存儲到driver端的blockManager中,blockManager是上會根據當前存儲類型(MEMORY_AND_DISK_SER,這也是當driver內存不夠時,driver也不內存溢出的緣由)去存儲,優先使用內存存儲,調用MemoryStore來存儲,若是內存不夠,則使用DiskStore來進行磁盤存儲。

2)在每一個executor上,executor使用到broadcast對象時,會嘗試從當前executor的blockManager中獲取數據,若不存在,則遠程從driver或其餘executor(若是可用)中獲取對象block,一旦獲取到block,它就會將block存放到broadcastCache中,爲其餘executor來獲取數據作好準備。

3)經過這種方式,能夠防止driver成爲(向每一個executor)發送廣播副本的瓶頸。若是driver向每一個executor都要發送廣播副本則會致使driver網絡帶寬成爲瓶頸,效率也會比較低。

代碼分析:

該類的構造函數:

1)經過readBroadcastBlock函數構造broadcast,並存放到broadcastCache中。在driver端,若須要value值,它會直接從本地的block manager中讀取數據。readBroadcastBlock函數的實現邏輯以下:

 readBlock業務:

 

 獲取流程爲:嘗試從當前executor的blockManager中獲取數據,若不存在,則遠程從driver或其餘executor(若是可用)中獲取對象block,獲取到後存放到broadcastCache中。

1)從當前executor本地獲取數據塊:SparkEnv.get.broadcastManager.cachedValues獲取對應broadcastId的數據塊值:broadcastCache.get(broadcastId)
2)從blockManager中獲取對應id的廣播變量的值:blockManager.getLocalValues(broadcastId)
3)若從blockManager中獲取到了該變量的值,則:broadcastCache.put(broadcastId, x)
4)若不能從blockManager中獲取值,則調用readBlocks函數來讀取數據塊。該函數會從driver或其餘的executors中讀取該變量的數據。

2)設置配置信息:setConf(SparkEnv.get.conf)

3)初始化廣播變量的惟一值:private val broadcastId=BraodcastBlockId(id)

4)調用writeBlocks把廣播變量劃分紅多個塊,並保存到blockManager中。

注意:其中該類中有些屬性被@transient修飾的,被@transient修飾的屬性不能被序列化。

Executor端如何獲取broadcast值

從實際開發中,咱們常先定義的broadcast,而後把broadcast對象傳遞給算子(好比:MapFunction)內部調用。傳遞給算子時,是傳遞的broadcast數據的包裝類對象(實際上就是TorrentBroadcast對象),在內部經過broadcast#getValue()獲取broadcast值。舉例說明:

        StructType resulStructType = new StructType();
        resulStructType = resulStructType.add("int_id", DataTypes.StringType, false);
        resulStructType = resulStructType.add("job_result", DataTypes.StringType, true);
        ExpressionEncoder<Row> resultEncoder = RowEncoder.apply(resulStructType);

        JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext().getConf());
        Map<String, String> resource = new java.util.HashMap<String, String>();
        for (int i = 0; i < 10000; i++) {
            resource.put(String.valueOf(i), String.valueOf(i));
        }
        Broadcast<Map<String, String>> broadcastMap = javaSparkContext.broadcast(resource);
        
        sourceDataset = sourceDataset.map(new MapFunction<Row, Row>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Row call(Row row) throws Exception {
                int int_id_idx = row.schema().fieldIndex("int_id");
                Object int_idObject = row.get(int_id_idx);
                String int_id = int_idObject.toString();
Map
<String, String> resources = broadcastMap.getValue(); String job_result = resources.get(int_id); Object[] values = new Object[2]; values[0] = int_id; values[1] = job_result; return RowFactory.create(values); } }, resultEncoder); sourceDataset.printSchema();

那麼,咱們接下來從源碼分析下,這個TorrentBroadcast#getValue()方法是如何實現的:

private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
  extends Broadcast[T](id) with Logging with Serializable {

  /**
   * Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
   * which builds this value by reading blocks from the driver and/or other executors.
   *
   * On the driver, if the value is required, it is read lazily from the block manager.
   */
  @transient private lazy val _value: T = readBroadcastBlock()

  /** The compression codec to use, or None if compression is disabled */
  @transient private var compressionCodec: Option[CompressionCodec] = _
  /** Size of each block. Default value is 4MB.  This value is only read by the broadcaster. */
  @transient private var blockSize: Int = _

  private def setConf(conf: SparkConf) {
    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
      Some(CompressionCodec.createCodec(conf))
    } else {
      None
    }
    // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided
    blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
    checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true)
  }
  setConf(SparkEnv.get.conf)

  private val broadcastId = BroadcastBlockId(id)

  /** Total number of blocks this broadcast variable contains. */
  private val numBlocks: Int = writeBlocks(obj)

  /** Whether to generate checksum for blocks or not. */
  private var checksumEnabled: Boolean = false
  /** The checksum for all the blocks. */
  private var checksums: Array[Int] = _

  override protected def getValue() = {
    _value
  }
。。。
}

從上邊代碼中咱們能夠知道:

1)傳遞給executor的算子(相似MapFunction)的broadcast包裝對象就是TorrentBroadcast對象。

2)「Broadcast包裝對象」傳遞給executor時,攜帶的傳遞屬性包含TorrentBroadcast的broadcastId,numBlocks等(除被@transient修飾外的屬性):

在executor端獲取值是經過「Broadcast包裝對象」#getValue()方法獲取值得,driver端傳遞過去的只是TorrentBroadcast對象的序列化字符串(被task引用的,所以隨task而傳遞[這點可質疑],是否是broadcast有特殊機制對task引用的broadcast對象採用單獨傳遞方式?),當executor端反序列化task時,會把TorrentBroadcast反序列化爲對象,經過調用broadcast#getValue()方法獲取broadcast的值。

TorrentBroadcast在driver端被序列化時,只能對TorrentBroadcast的非@transient 修飾的屬性進行序列化(被@transient修飾的屬性不能被序列化),由於TorrentBroadcast的_value是被@transient修飾,因此broadcast的值在driver端並不能被反序列化傳遞給executor。

這也是實現driver端傳遞broadcast給executor時,只傳遞broadcastId,numBlocks等屬性(除去_value值外)的真正實現。

3)在executor端,執行map算子時,會調用TorrentBroadcast對象的getVlaue()方法,此時其實是讀取_value=readBroadcastBlock(),也就是嘗試從當前executor的blockManager讀取,當前executor中讀取不到則從當前節點其餘executor中獲取,若是仍是獲取不到則從遠程driver讀取。
斷點監控結果截圖:

 

 

參考

spark2原理分析-廣播變量(Broadcast Variables)的實現原理

RDD Programming Guide

SparkSQL中的三種Join及其具體實現(broadcast join、shuffle hash join和sort merge join)

Spark SQL中的broadcast join分析

相關文章
相關標籤/搜索