此次介紹前面沒有說起的 Spark 編程的各類進階特性,會介紹兩種類型的共享變量:累加器(accumulator)與廣播變量(broadcast variable)。累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。在已有的 RDD 轉化操做的基礎上,咱們爲相似查詢數據庫這樣須要很大配置代價的任務引入了批操做。爲了擴展可用的工具範圍,還會簡單介紹 Spark 與外部程序交互的方式,好比如何與用 R 語言編寫的腳本進行交互。java
當任務須要很長時間進行配置,譬如須要建立數據庫鏈接或者隨機數生成器時,在多個數據元素間共享一次配置就會比較有效率,因此還會討論如何基於分區進行操做以重用數據庫鏈接的配置工做。程序員
一般在向 Spark 傳遞函數時,好比使用 map() 函數或者用 filter() 傳條件時,可使用驅動器程序中定義的變量,可是集羣中運行的每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。Spark 的兩個共享變量,累加器與廣播變量,分別爲結果聚合與廣播這兩種常見的通訊模式突破了這一限制。算法
第一種共享變量,即累加器,提供了將工做節點中的值聚合到驅動器程序中的簡單語法。累加器的一個常見用途是在調試時對做業執行過程當中的事件進行計數。例如咱們想知道輸入文件中有多少空行。數據庫
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object Spark_6 { def main(args: Array[String]): Unit = { // 在Scala中累加空行 val conf = new SparkConf().setAppName("test").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("WARN") // 設置日誌顯示級別 val input = sc.textFile("words.txt") val blankLines = sc.accumulator(0) // 建立Accumulator[Int]並初始化爲0 val callSigns = input.flatMap(line => { if(line==""){ blankLines += 1 // 累加器加1 } line.split(" ") }) callSigns.saveAsTextFile("test") // 注意只有在運行saveAsTextFile()行動操做後才能看到正確的計數 Dubug看不到flatMap運行過程 println("Blank Lines: "+blankLines.value) } }
在這個示例中,咱們建立了一個叫做 blankLines 的 Accumulator[Int] 對象,而後在輸入中看到一個空行時就對其加 1。執行完轉化操做以後,就打印出累加器中的值。注意,只有在運行 saveAsTextFile() 行動操做後才能看到正確的計數,由於行動操做前的轉化操做flatMap() 是惰性的,因此做爲計算副產品的累加器只有在惰性的轉化操做 flatMap() 被saveAsTextFile() 行動操做強制觸發時纔會開始求值。apache
固然,也可使用 reduce() 這樣的行動操做將整個 RDD 中的值都聚合到驅動器中。只是咱們有時但願使用一種更簡單的方法來對那些與 RDD 自己的範圍和粒度不同的值進行聚合。聚合能夠發生在 RDD 進行轉化操做的過程當中。在前面的例子中,咱們使用累加器在讀取數據時對空行進行計數,而沒有分別使用 filter() 和 reduce() 。
總結起來,累加器的用法以下所示。
• 經過在驅動器中調用 SparkContext.accumulator(initialValue) 方法,建立出存有初始值的累加器。返回值爲 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值initialValue 的類型。
• Spark閉包裏的執行器代碼可使用累加器的 += 方法(在Java中是 add )增長累加器的值。
• 驅動器程序能夠調用累加器的 value 屬性(在 Java 中使用 value() 或 setValue() )來訪問累加器的值。
注意,工做節點上的任務不能訪問累加器的值。從這些任務的角度來看,累加器是一個只寫變量。在這種模式下,累加器的實現能夠更加高效,不須要對每次更新操做進行復雜的通訊。累加器在Driver端定義賦初始值,累加器只能在Driver端讀取,在Excutor端更新。下圖簡單說明了什麼是Driver(驅動器)端,什麼是Excutor端。編程
Spark 的第二種共享變量類型是廣播變量,它可讓程序高效地向全部工做節點發送一個較大的只讀值,以供一個或多個 Spark 操做使用。好比,若是你的應用須要向全部節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特徵向量,廣播變量用起來都很順手。數組
廣播變量容許程序員保持只讀變量,在每一個機器上緩存,而不是用任務來發送它的副本。它們能夠有效的方式給每一個節點提供一個大的輸入數據集的副本。spark嘗試使用高效廣播算法來分發廣播變量以減小通訊成本。注意,對象在廣播後不該修改以確保全部節點得到廣播變量的相同值 。緩存
Broadcast 就是將數據從一個節點發送到其餘的節點上; 例如 Driver 上有一張表,而 Executor 中的每一個並行執行的Task (100萬個Task) 都要查詢這張表的話,那咱們經過 Broadcast 的方式就只須要往每一個Executor 把這張表發送一次就好了,Executor 中的每一個運行的 Task 查詢這張惟一的表,而不是每次執行的時候都從 Driver 中得到這張表!網絡
Broadcast 是分佈式的共享數據,默認狀況下只要程序在運行 Broadcast 變量就會存在,由於 Broadcast 在底層是經過 BlockManager 管理的,Broadcast 是在建立 SparkContext 時被建立的!你也能夠手動指定或者配置具體週期來銷燬 Broadcast 變量!Broadcast 通常用於處理共享配置文件,通用的數據子,經常使用的數據結構等等;可是不適合存放太大的數據在Broadcast。Broadcast 不會內存溢出,由於其數據的保存的 Storage Level 是 MEMORY_AND_DISK 的方式(首先優先放在內存中,若是內存不夠才放在磁盤上)雖然如此,咱們也不能夠放入太大的數據在 Broadcast 中,由於網絡 I/O 和可能的單點壓力會很是大!
沒有廣播的狀況:經過網絡傳輸把變量發送到每個 Task 中,會產生4個Number的數據副本,每一個副本都會佔用必定的內存空間,若是變量比較大,會致使則極易出現OOM。 數據結構
使用廣播的狀況:經過Broadcast把變量傳輸到Executor的內存中,Executor級別共享惟一的一份廣播變量,極大的減小網絡傳輸和內存消耗!
使用廣播變量的過程很簡單。
(1) 經過對一個類型 T 的對象調用 SparkContext.broadcast 建立出一個 Broadcast[T] 對象。任何可序列化的類型均可以這麼實現。
(2) 經過 value 屬性訪問該對象的值(在 Java 中爲 value() 方法)。
(3) 變量只會被髮到各個節點一次,應做爲只讀值處理(修改這個值不會影響到別的節點)。
val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
注意事項:
能不能將一個RDD使用廣播變量廣播出去?不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。廣播變量只能在Driver端定義,不能在Executor端定義。在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。
當廣播一個比較大的值時,選擇既快又好的序列化格式是很重要的,由於若是序列化對象的時間很長或者傳送花費的時間過久,這段時間很容易就成爲性能瓶頸。尤爲是,Spark的 Scala 和 Java API 中默認使用的序列化庫爲 Java 序列化庫,所以它對於除基本類型的數組之外的任何對象都比較低效。你可使用 spark.serializer 屬性選擇另外一個序列化庫來優化序列化過程(後面會討論如何使用 Kryo 這種更快的序列化庫),也能夠爲你的數據類型實現本身的序列化方式(對 Java 對象使用 java.io.Externalizable 接口實現序列化,或使用 reduce() 方法爲 Python 的 pickle 庫定義自定義的序列化)。