Spark共享變量

共享變量python

一般狀況下,當向Spark操做(如map,reduce)傳遞一個函數時,它會在一個遠程集羣節點上執行,它會使用函數中全部變量的副本。這些變量被複制到全部的機器上,遠程機器上並無被更新的變量會向驅動程序回傳。在任務之間使用通用的,支持讀寫的共享變量是低效的。儘管如此,Spark提供了兩種有限類型的共享變量,廣播變量和累加器。程序員

 

廣播變量算法

廣播變量容許程序員將一個只讀的變量緩存在每臺機器上,而不用在任務之間傳遞變量。廣播變量可被用於有效地給每一個節點一個大輸入數據集的副本。Spark還嘗試使用高效地廣播算法來分發變量,進而減小通訊的開銷。apache

Spark的動做經過一系列的步驟執行,這些步驟由分佈式的洗牌操做分開。Spark自動地廣播每一個步驟每一個任務須要的通用數據。這些廣播數據被序列化地緩存,在運行任務以前被反序列化出來。這意味着當咱們須要在多個階段的任務之間使用相同的數據,或者以反序列化形式緩存數據是十分重要的時候,顯式地建立廣播變量纔有用。編程


經過在一個變量v上調用SparkContext.broadcast(v)能夠建立廣播變量。廣播變量是圍繞着v的封裝,能夠經過value方法訪問這個變量。舉例以下:數組

 

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)

 

在建立了廣播變量以後,在集羣上的全部函數中應該使用它來替代使用v.這樣v就不會不止一次地在節點之間傳輸了。另外,爲了確保全部的節點得到相同的變量,對象v在被廣播以後就不該該再修改。緩存

 

累加器分佈式

累加器是僅僅被相關操做累加的變量,所以能夠在並行中被有效地支持。它能夠被用來實現計數器和總和。Spark原生地只支持數字類型的累加器,編程者能夠添加新類型的支持。若是建立累加器時指定了名字,能夠在Spark的UI界面看到。這有利於理解每一個執行階段的進程。(對於python還不支持)函數

累加器經過對一個初始化了的變量v調用SparkContext.accumulator(v)來建立。在集羣上運行的任務能夠經過add或者"+="方法在累加器上進行累加操做。可是,它們不能讀取它的值。只有驅動程序可以讀取它的值,經過累加器的value方法。this

下面的代碼展現瞭如何把一個數組中的全部元素累加到累加器上:

 

scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10

 

儘管上面的例子使用了內置支持的累加器類型Int,可是開發人員也能夠經過繼承AccumulatorParam類來建立它們本身的累加器類型。AccumulatorParam接口有兩個方法:

zero方法爲你的類型提供一個0值。

addInPlace方法將兩個值相加。

假設咱們有一個表明數學vector的Vector類。咱們能夠向下面這樣實現:

 

object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在Scala裏,Spark提供更通用的累加接口來累加數據,儘管結果的類型和累加的數據類型可能不一致(例如,經過收集在一塊兒的元素來建立一個列表)。同時,SparkContext..accumulableCollection方法來累加通用的Scala的集合類型。

 

累加器僅僅在動做操做內部被更新,Spark保證每一個任務在累加器上的更新操做只被執行一次,也就是說,重啓任務也不會更新。在轉換操做中,用戶必須意識到每一個任務對累加器的更新操做可能被不僅一次執行,若是從新執行了任務和做業的階段。

累加器並無改變Spark的惰性求值模型。若是它們被RDD上的操做更新,它們的值只有當RDD由於動做操做被計算時才被更新。所以,當執行一個惰性的轉換操做,好比map時,不能保證對累加器值的更新被實際執行了。下面的代碼片斷演示了此特性:

 

val accum = sc.accumulator(0) data.map { x => accum += x; f(x) } //在這裏,accum的值仍然是0,由於沒有動做操做引發map被實際的計算.
相關文章
相關標籤/搜索