1.Spark兩種共享變量:廣播變量(broadcast variable)與累加器(accumulator)
累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。
共享變量出現的緣由:
一般在向 Spark 傳遞函數時,好比使用 map() 函數或者用 filter() 傳條件時,可使用驅動器程序中定義的變量,
可是集羣中運行的每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。
Spark 的兩個共享變量,累加器與廣播變量,分別爲結果聚合與廣播這兩種常見的通訊模式突破了這一限制。
2. 廣播變量的引入:
Spark 會自動把閉包中全部引用到的變量發送到工做節點上。雖然這很方便,但也很低效。
緣由有二:首先,默認的任務發射機制是專門爲小任務進行優化的;其次,事實上你可能會在多個並
行操做中使用同一個變量,可是 Spark 會爲每一個操做分別發送。
1.用一段代碼來更直觀的解釋:
object BroadcastTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("broadcast")
val sc = new SparkContext(conf)
val list = List("hello java")
val linesRDD = sc.textFile("./word")
linesRDD.filter(line => {
list.contains(line)
}).foreach(println)
sc.stop()
}
}
list是在driver端建立的,可是由於須要在excutor端使用,因此driver會把list以task的形式發送到excutor端,
若是有不少個task,就會有不少給excutor端攜帶不少個list,若是這個list很是大的時候,就可能會形成內存溢出
(以下圖所示)。這個時候就引出了廣播變量。
使用廣播變量後:
使用廣播變量的過程很簡單:
(1) 經過對一個類型 T 的對象調用 SparkContext.broadcast建立出一個 Broadcast[T]對象。任何可序列化的類型均可以這麼實現。
(2) 經過 value 屬性訪問該對象的值(在 Java 中爲 value() 方法)。
(3) 變量只會被髮到各個節點一次,應做爲只讀值處理(修改這個值不會影響到別的節點)。
案例以下:
object BroadcastTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("broadcast")
val sc = new SparkContext(conf)
val list = List("hello java")
val broadcast = sc.broadcast(list)
val linesRDD = sc.textFile("./word")
linesRDD.filter(line => {
broadcast.value.contains(line)
}).foreach(println)
sc.stop()
}
}
2.注意事項:
能不能將一個RDD使用廣播變量廣播出去?
不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。
廣播變量只能在Driver端定義,不能在Executor端定義。
3.在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。
object AccumulatorTest{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("broadcast")
val sc = new SparkContext(conf)
val linesRDD = sc.textFile("./word")
var i=0;
val result = linesRDD.map(s=>{
i += 1
s
})
result.collect()
println("word lines is :"+i)
sc.stop()
}
}
//咱們發現打印的結果爲
//word lines is :0
依然是driver和excutor端的數據不能共享的問題。excutor端修改了變量,根本不會讓driver端跟着修改,
這個就是累加器出現的緣由。
3.累加器:
做用:提供了將工做節點中的值聚合到驅動器程序中的簡單語法。(以下圖)
經常使用場景:調試時對做業執行過程當中的事件進行計數。
1. 累加器的用法以下所示:
(1)經過在driver中調用 SparkContext.accumulator(initialValue) 方法,建立出存有初始值的累加器。返回值爲
org.apache.spark.Accumulator[T] 對象,其中 T 是初始值initialValue 的類型。
(2)Spark閉包(函數序列化)裏的excutor代碼可使用累加器的 += 方法(在Java中是 add )增長累加器的值。
(3)driver程序能夠調用累加器的 value 屬性(在 Java 中使用 value() 或 setValue() )來訪問累加器的值。
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.longAccumulator("AccumulatorTest"); //建立accumulator並初始化爲0
val linesRDD = sc.textFile("./sp")
val result = linesRDD.map(s => {
accumulator.add(1) //有一條數據就增長1
s
})
result.collect();
println("words lines is :" + accumulator.value)
sc.stop()
}
}
// 輸出結果:
//words lines is :3
2.注意事項
累加器在Driver端定義賦初始值,累加器只能在Driver端讀取,在Excutor端更新(以下圖)。