在前面總結的幾篇spark踩坑博文中,我總結了本身在使用spark過程中踩過的一些坑和經驗。咱們知道Spark是多機器集羣部署的,分爲Driver/Master/Worker,Master負責資源調度,Worker是不一樣的運算節點,由Master統一調度,而Driver是咱們提交Spark程序的節點,而且全部的reduce類型的操做都會彙總到Driver節點進行整合。節點之間會將map/reduce等操做函數傳遞一個獨立副本到每個節點,這些變量也會複製到每臺機器上,而節點之間的運算是相互獨立的,變量的更新並不會傳遞迴Driver程序。那麼有個問題,若是咱們想在節點之間共享一份變量,好比一份公共的配置項,該怎麼辦呢?Spark爲咱們提供了兩種特定的共享變量,來完成節點間變量的共享。
本文首先簡單的介紹spark以及spark streaming中累加器和廣播變量的使用方式,而後重點介紹一下如何更新廣播變量。java
顧名思義,累加器是一種只能經過關聯操做進行「加」操做的變量,所以它可以高效的應用於並行操做中。它們可以用來實現counters和sums。Spark原生支持數值類型的累加器,開發者能夠本身添加支持的類型,在2.0.0以前的版本中,經過繼承AccumulatorParam來實現,而2.0.0以後的版本須要繼承AccumulatorV2來實現自定義類型的累加器。
若是建立了一個具名的累加器,它能夠在spark的UI中顯示。這對於理解運行階段(running stages)的過程有很重要的做用。以下圖:
在2.0.0以前版本中,累加器的聲明使用方式以下:node
scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10
累加器的聲明在2.0.0發生了變化,到2.1.0也有所變化,具體能夠參考官方文檔,咱們這裏以2.1.0爲例將代碼貼一下:git
scala> val accum = sc.longAccumulator("My Accumulator") accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Long = 10
累加器比較簡單直觀,若是咱們須要在spark中進行一些全局統計就可使用它。可是有時候僅僅一個累加器並不能知足咱們的需求,好比數據庫中一份公共配置表格,須要同步給各個節點進行查詢。OK先來簡單介紹下spark中的廣播變量:
廣播變量容許程序員緩存一個只讀的變量在每臺機器上面,而不是每一個任務保存一份拷貝。例如,利用廣播變量,咱們可以以一種更有效率的方式將一個大數據量輸入集合的副本分配給每一個節點。Spark也嘗試着利用有效的廣播算法去分配廣播變量,以減小通訊的成本。
一個廣播變量能夠經過調用SparkContext.broadcast(v)方法從一個初始變量v中建立。廣播變量是v的一個包裝變量,它的值能夠經過value方法訪問,下面的代碼說明了這個過程:程序員
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
從上文咱們能夠看出廣播變量的聲明很簡單,調用broadcast就能搞定,而且scala中一切可序列化的對象都是能夠進行廣播的,這就給了咱們很大的想象空間,能夠利用廣播變量將一些常常訪問的大變量進行廣播,而不是每一個任務保存一份,這樣能夠減小資源上的浪費。算法
廣播變量能夠用來更新一些大的配置變量,好比數據庫中的一張表格,那麼有這樣一個問題,若是數據庫當中的配置表格進行了更新,咱們須要從新廣播變量該怎麼作呢。上文對廣播變量的說明中,咱們知道廣播變量是隻讀的,也就是說廣播出去的變量無法再修改,那麼咱們應該怎麼解決這個問題呢?
答案是利用spark中的unpersist函數sql
Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.數據庫
上文是從spark官方文檔摘抄出來的,咱們能夠看出,正常來講每一個節點的數據是不須要咱們操心的,spark會自動按照LRU規則將老數據刪除,若是須要手動刪除能夠調用unpersist函數。
那麼更新廣播變量的基本思路:將老的廣播變量刪除(unpersist),而後從新廣播一遍新的廣播變量,爲此簡單包裝了一個用於廣播和更新廣播變量的wraper類,以下:apache
import java.io.{ ObjectInputStream, ObjectOutputStream } import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag // This wrapper lets us update brodcast variables within DStreams' foreachRDD // without running into serialization issues case class BroadcastWrapper[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { // 刪除RDD是否須要鎖定 v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } }
利用該wrapper更新廣播變量,大體的處理邏輯以下:緩存
// 定義 val yourBroadcast = BroadcastWrapper[yourType](ssc, yourValue) yourStream.transform(rdd => { //按期更新廣播變量 if (System.currentTimeMillis - someTime > Conf.updateFreq) { yourBroadcast.update(newValue, true) } // do something else })
spark中的共享變量是咱們可以在全局作出一些操做,好比record總數的統計更新,一些大變量配置項的廣播等等。而對於廣播變量,咱們也能夠監控數據庫中的變化,作到定時的從新廣播新的數據表配置狀況,另外我使用上述方式,在天天千萬級的數據實時流統計中表現穩定,因此有類似問題的同窗也能夠進行嘗試,有任何問題,歡迎隨時騷擾溝通^v^
廣告下咱們項目:專一於遊戲輿情的挖掘分析,歡迎你們來踩踩
http://wetest.qq.com/bee/