Spark中由於算子中的真正邏輯是發送到Executor中去運行的,因此當Executor中須要引用外部變量時,須要使用廣播變量。
進一步解釋:html
A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
Broadcast variables are created from a variable v by calling SparkContext.broadcast(T, scala.reflect.ClassTag<T>). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The interpreter session below shows this:
[翻譯]它是一個廣播變量。廣播變量容許程序員在每臺計算機上緩存只讀變量,而不是將其副本與任務一塊兒發送。例如,它們能夠有效地爲每一個節點提供一個大型輸入數據集的副本。Spark還嘗試使用有效的廣播算法來分配廣播變量,以下降通訊成本。
廣播變量是經過調用sparkcontext.broadcast(t,scala.reflect.classtag<t>)從變量v建立的。廣播變量是圍繞v的包裝器,它的值能夠經過調用value方法來訪問。下面的口譯員會話顯示了這一點:java
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int} = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
[翻譯]建立廣播變量後,應在集羣上運行的任何函數中使用它,而不是使用值v,這樣V就不會屢次發送到節點。此外,對象v在廣播後不該進行修改,以確保全部節點都得到廣播變量的相同值(例如,若是變量稍後被髮送到新節點)。node
package org.apache.spark.broadcast import java.io.Serializable import scala.reflect.ClassTag import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.Utils /** * A broadcast variable. * 。。。。。。 * @param id A unique identifier for the broadcast variable. * @tparam T Type of the data contained in the broadcast variable. */ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging { /** * Flag signifying whether the broadcast variable is valid(that is, not already destroyed) or not. * 指示廣播變量是否有效的標誌(即,還沒有銷燬)或未銷燬。 */ @volatile private var _isValid = true private var _destroySite = "" /** Get the broadcasted value. */ def value: T = { assertValid() getValue() } /** * Asynchronously delete cached copies of this broadcast on the executors. If the broadcast is used after this is called, it will need to be re-sent to each executor. * 異步刪除執行器上此廣播的緩存副本。若是在調用後使用廣播,則須要將其從新發送給每一個執行者。 */ def unpersist() { unpersist(blocking = false) } /** * Delete cached copies of this broadcast on the executors. If the broadcast is used after this is called, it will need to be re-sent to each executor. * 刪除執行器上此廣播的緩存副本。若是在調用後使用廣播,則須要將其從新發送給每一個執行者。 * @param blocking Whether to block until unpersisting has completed 是否阻止直到unpersist完成 */ def unpersist(blocking: Boolean) { assertValid() doUnpersist(blocking) } /** * Destroy all data and metadata related to this broadcast variable. Use this with caution; once a broadcast variable has been destroyed, it cannot be used again. This method blocks until destroy has completed * 銷燬與此廣播變量相關的全部數據和元數據。當心使用;廣播變量一旦被銷燬,就不能再使用。此方法在銷燬完成以前阻止 */ def destroy() { destroy(blocking = true) } /** * Destroy all data and metadata related to this broadcast variable. Use this with caution; once a broadcast variable has been destroyed, it cannot be used again. * 銷燬與此廣播變量相關的全部數據和元數據。當心使用;廣播變量一旦被銷燬,就不能再使用。 * @param blocking Whether to block until destroy has completed 是否阻止直到銷燬完成 */ private[spark] def destroy(blocking: Boolean) { assertValid() _isValid = false _destroySite = Utils.getCallSite().shortForm logInfo("Destroying %s (from %s)".format(toString, _destroySite)) doDestroy(blocking) } /** * Whether this Broadcast is actually usable. This should be false once persisted state is removed from the driver. * 廣播變量是否實際可用。一旦持久化狀態從driver中被移除將會返回false. */ private[spark] def isValid: Boolean = { _isValid } /** * Actually get the broadcasted value. Concrete implementations of Broadcast class must define their own way to get the value. * 實際獲取廣播值。廣播類的具體實現必須定義本身的方法來獲取值。 */ protected def getValue(): T /** * Actually unpersist the broadcasted value on the executors. Concrete implementations of Broadcast class must define their own logic to unpersist their own data. */ protected def doUnpersist(blocking: Boolean) /** * Actually destroy all data and metadata related to this broadcast variable. Implementation of Broadcast class must define their own logic to destroy their own state. */ protected def doDestroy(blocking: Boolean) /** Check if this broadcast is valid. If not valid, exception is thrown. */ protected def assertValid() { if (!_isValid) { throw new SparkException("Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite)) } } override def toString: String = "Broadcast(" + id + ")" }
其中隱含的概念:
1)broadcast的定義必須在Driver端,不能再executor端定義;
2)調用unpersist(),unpersist(boolean blocking),destroy(),distroy(boolean blocking)方法這些方法必須在driver端調用。
3)在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。程序員具體操做步驟:
a.在driver端調用unpersist(true)方法;
b.對該broadcast實例對象進行從新賦值。
參考《Spark2.2(三十三):Spark Streaming和Spark Structured Streaming更新broadcast總結(一)》、《Spark2.3(四十二):Spark Streaming和Spark Structured Streaming更新broadcast總結(二)》算法
注意事項
一、能不能將一個RDD使用廣播變量廣播出去?
不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。
二、廣播變量只能在Driver端定義,不能在Executor端定義。
三、在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。
四、若是executor端用到了Driver的變量,若是不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。
五、若是Executor端用到了Driver的變量,若是使用廣播變量在每一個Executor中只有一份Driver端的變量副本。apache