閉包是一個函數,返回值依賴於聲明在函數外部的一個或多個變量。閉包一般來說能夠簡單的認爲是能夠訪問一個函數裏面局部變量的另一個函數。json
以下面這段匿名的函數:閉包
val multiplier = (i:Int) => i * 10
函數體內有一個變量 i,它做爲函數的一個參數。以下面的另外一段代碼:併發
val multiplier = (i:Int) => i * factor
在 multiplier
中有兩個變量:i 和 factor。其中的一個 i 是函數的形式參數,在 multiplier
函數被調用時,i 被賦予一個新的值。然而,factor不是形式參數,而是自由變量,考慮下面代碼:函數
var factor = 3 val multiplier = (i:Int) => i * factor
這裏咱們引入一個自由變量 factor
,這個變量定義在函數外面。大數據
這樣定義的函數變量 multiplier
成爲一個"閉包",由於它引用到函數外面定義的變量,定義這個函數的過程是將這個自由變量捕獲而構成一個封閉的函數人工智能
完整的例子:spa
object Test { def main(args: Array[String]) { println( "muliplier(1) value = " + multiplier(1) ) println( "muliplier(2) value = " + multiplier(2) ) } var factor = 3 val multiplier = (i:Int) => i * factor }
先來看下面一段代碼:scala
val data=Array(1, 2, 3, 4, 5) var counter = 0 var rdd = sc.parallelize(data) // ???? 這樣作會怎麼樣 rdd.foreach(x => counter += x) println("Counter value: " + counter)
首先確定的是上面輸出的結果是0,park將RDD操做的處理分解爲tasks,每一個task由Executor
執行。在執行以前,Spark會計算task的閉包。閉包是Executor
在RDD上進行計算的時候必須可見的那些變量和方法(在這種狀況下是foreach())。閉包會被序列化併發送給每一個Executor,可是發送給Executor的是副本,因此在Driver上輸出的依然是counter
自己,若是想對全局的進行更新,用累加器,在spark-streaming
裏面使用updateStateByKey
來更新公共的狀態。code
另外在Spark中的閉包還有別的做用,orm
1.清除Driver發送到Executor上的無用的全局變量等,只複製有用的變量信息給Executor
2.保證發送到Executor上的是序列化之後的數據
好比在使用DataSet時候 case class的定義必須在類下,而不能是方法內,即便語法上沒問題,若是使用過json4s來序列化,implicit val formats = DefaultFormats
的引入最好放在類下,不然要單獨將這個format序列化,即便你沒有使用到它別的東西。
閉包在Spark的整個生命週期中到處可見,就好比從Driver
上拷貝的全部數據都須要序列化 + 閉包的方式到Executor
上的。
吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注