Spark共享變量(廣播變量、累加器)

1.Spark兩種共享變量:廣播變量(broadcast variable)與累加器(accumulator)

累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。
共享變量出現的緣由:
    一般在向 Spark 傳遞函數時,好比使用 map() 函數或者用 filter() 傳條件時,可使用驅動器程序中定義的變量,
可是集羣中運行的每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。
Spark 的兩個共享變量,累加器與廣播變量,分別爲結果聚合與廣播這兩種常見的通訊模式突破了這一限制。

2. 廣播變量的引入

Spark 會自動把閉包中全部引用到的變量發送到工做節點上。雖然這很方便,但也很低效。
緣由有二:首先,默認的任務發射機制是專門爲小任務進行優化的;其次,事實上你可能會在多個並
行操做中使用同一個變量,可是 Spark 會爲每一個操做分別發送。

1.用一段代碼來更直觀的解釋:

object BroadcastTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("broadcast")
    val sc = new SparkContext(conf)
    val list = List("hello java")
    val linesRDD = sc.textFile("./word")
    linesRDD.filter(line => {
      list.contains(line)
    }).foreach(println)
    sc.stop()
  }
}
list是在driver端建立的,可是由於須要在excutor端使用,因此driver會把list以task的形式發送到excutor端,
若是有不少個task,就會有不少給excutor端攜帶不少個list,若是這個list很是大的時候,就可能會形成內存溢出
(以下圖所示)。這個時候就引出了廣播變量。


使用廣播變量後:


使用廣播變量的過程很簡單:
(1) 經過對一個類型 T 的對象調用 SparkContext.broadcast建立出一個 Broadcast[T]對象。任何可序列化的類型均可以這麼實現。
(2) 經過 value 屬性訪問該對象的值(在 Java 中爲 value() 方法)。
(3) 變量只會被髮到各個節點一次,應做爲只讀值處理(修改這個值不會影響到別的節點)。
案例以下
object BroadcastTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("broadcast")
    val sc = new SparkContext(conf)
    val list = List("hello java")
    val broadcast = sc.broadcast(list)
    val linesRDD = sc.textFile("./word")
    linesRDD.filter(line => {
      broadcast.value.contains(line)
    }).foreach(println)
    sc.stop()
  }
}

2.注意事項:

能不能將一個RDD使用廣播變量廣播出去?
    不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。
廣播變量只能在Driver端定義,不能在Executor端定義。

3.在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值
 

object AccumulatorTest{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("broadcast")
    val sc = new SparkContext(conf)
    val linesRDD = sc.textFile("./word")
    var i=0;
    val result = linesRDD.map(s=>{
      i += 1
      s
    })
    result.collect()
    println("word lines is :"+i)
    sc.stop()
  }
}
//咱們發現打印的結果爲
//word lines is :0
依然是driver和excutor端的數據不能共享的問題。excutor端修改了變量,根本不會讓driver端跟着修改,
這個就是累加器出現的緣由。

3.累加器:

做用:提供了將工做節點中的值聚合到驅動器程序中的簡單語法。(以下圖)


經常使用場景:調試時對做業執行過程當中的事件進行計數。

1. 累加器的用法以下所示:

(1)經過在driver中調用 SparkContext.accumulator(initialValue) 方法,建立出存有初始值的累加器。返回值爲 
org.apache.spark.Accumulator[T] 對象,其中 T 是初始值initialValue 的類型。

(2)Spark閉包(函數序列化)裏的excutor代碼可使用累加器的 += 方法(在Java中是 add )增長累加器的值。

(3)driver程序能夠調用累加器的 value 屬性(在 Java 中使用 value() 或 setValue() )來訪問累加器的值。

object  AccumulatorTest  {
 def  main(args: Array[String]): Unit = {
  val  conf  = new  SparkConf().setMaster("local").setAppName("accumulator")
  val  sc  = new  SparkContext(conf)
  val  accumulator  = sc.longAccumulator("AccumulatorTest"); //建立accumulator並初始化爲0
  val  linesRDD  = sc.textFile("./sp")
  val  result  = linesRDD.map(s  => {
   accumulator.add(1) //有一條數據就增長1
   s
  })
  result.collect();
  println("words lines is :"  +  accumulator.value)
  sc.stop()
 }
}
// 輸出結果:
//words lines is :3

 2.注意事項

累加器在Driver端定義賦初始值,累加器只能在Driver端讀取,在Excutor端更新(以下圖)。
相關文章
相關標籤/搜索