spark streaming checkpointing windows

spark streaming的相關概念html

  spark的核心是建立一個RDD對象,而後對RDD對象進行計算操做等java

  streaming能夠理解爲是 一個接二連三的數據流 ,而後將每一個固定時間段裏的數據構建成一個RDD,而後就會創一連串的RDD流,這就是DStream(streaming的主要操做對象)node

  batch  就是上面所說的固定時間段的時間表示,能夠爲5秒,10秒or ect ,DStream中每一個RDD接收數據的時間,也做 mini-batchpython

  window 用於裝載上述【一個bactch內數據所建立的RDD】   的容器,容器的大小用時間表示(毫秒,秒,分鐘等),必須爲 batch的倍數,window 內的RDD也會隨時間而不斷更新,window的更新間隔稱爲duration ,必須爲batch的倍數。git

以下圖 可理解爲一個RDD的batch爲5秒 ,一個window的容積是2個RDD的batch即10秒,window的duration爲一個batch 5秒。github

即window每次滑動的距離爲5秒,1個RDD。sql

 使用checkpointing更新數據的代碼示例apache

 統計文件中的數據,並使用updateStateByKey來更新結果app

 Scala版socket

package wordcounttest

import org.apache.spark._
import org.apache.spark.streaming._
import java.sql.Timestamp
import org.apache.spark.streaming.dstream.DStream

Object SteamingTest{
    def main(args:Array[String]){
        val conf = new SparkConf().setMaster("local[4]").setAppName("test1")
        val sc = new SparkContext(conf)
        // 這裏設置 Dstream 的batch 是 5 秒
        val ssc = new StreamingContext(sc,Seconds(5))
        val filestream = ssc.textFileStream("/home/spark/cho6input")  //設置dstream 數據來源,在ssc.start()後這個目錄中新生成的txt文件都會被讀入
         
        //定義一個處理order的class
        case class Order(time:java.sql.TimeStamp, OrderId:Long, clientId:Long,symbol:String,amount:Int,price:Double,buy:Boolean)

        val orders = filestream.flatMap(line => {
            val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")  //設置要解析的時間數據的字符型式
            val s = line.split(",")
            try{
                assert(s(6) == "B" || s(6) == "S")
                List(Order(new Timestamp(dateFormat.parse(s(0)).getTime()),s(1).toLong,s(2).toLong,s(3),s(4).toInt,s(5).toDouble,s(6) == "B"))
            }
            catch{
                case e: Throwable => println("Wrong line format(" + e + "):" +line)
                List()
            }
        })
        val numPerType = orders.map(o => (o.buy,1L)).reduceByKey((c1,c2) => c1+c2)
    
        val amountPerClient = orders.map(o => (o.clientId,o.amount*o.price))
   val amountState = amountPerClient.updateStateByKey((vals,totalOpt:Option[Double]) => { totalOpt match{ case Some(total) => Some(total +vals.sum) case None => Some(vals.sum) } } val updateAmountState = (clinetId:Long,amount:Option[Double],state:State[Double]) => { var total = amount.getOrElse(0.toDouble) if(state.exists()) total += state.get() state.update(total) Some((clientId,total)) }
  // map val amountState2 = amountPerClient.mapWithState(StateSpec.function(updateAmountState)).stateSnapshots() val top5clients = amountState.transform( _.sortBy(_._2,false).map(_._1).zipWithIndex.filter(x =>x._2 <5)) val buySellList = numPerType.map( t => if(t._1) ("BUYS",List(t._2.toString)) else ("SELLS",List(t._2.toString)) ) val top5clList = top5clients.repartition(1).map(x => x._1.toString).glom().map(arr => ("TOP5CLIENTS",arr.toList)) val finaleStream = buySellList.union(top5clList) finalStream.repartition(1).saveAsTextFiles("/home/spark/ch06output/output","txt") sc.setCheckpointDir("/home/spark/checkpoint") ssc.start()
}


 

  

 

updateStateByKey(func)使用說明

 updateStateByKey通常只須要一個參數 -- func(vals,optdata)  這個func的入參是(一個數據隊列vals 和一個Option類型的數據) 返回值是Option類型數據
vals是當前新的batch中接收的數據隊列 ,Option類型的數據是 上個batch的計算的保存的結果 , 返回值option類型的數據是當前計算的結果
經過設置 checkpoint 使用updateStateByKey計算所得的結果會自動保存,下個batch計算時會自動讀取

mapWithState(func)使用說明 

   mapWithState(func)的入參 也只需一個func ,這個func經過org.apache.spark.streaming.StateSpec.function()包裹

的一個方法,這個方法的形式爲(key,value,optionstate,maptypestate) 即入參爲 key,value,上次保存的option類型的狀態值,

返回值爲option類型的另外一種類型的狀態值。即上次保存的狀態值類型和 當前生成的狀態值類型能夠不一樣。

 

區別 updateStateByKey的func的入參 上次保存的狀態值 和 生成的狀態值類型 須要相同

mapWithState的func的入參 上次保存的狀態值 和 生成的狀態值類型 能夠不一樣

 

checkpointing說明

若是經過 sc.setCheckpointDir('some/path')設置了checkpoint的目錄

streaming就會自動在某些時刻 對當前的 DStreaming的信息保存,以便下一次須要歷史數據時不用從新計算,能夠直接獲取;

也可用於故障恢復,streaming重啓時根據以前保存 的RDD信息來建立RDD。

 

使用 checkpoint時通常只需設置CheckpointDir保存當前狀態信息的目錄,也可設置checkpoint的各個保存點的時間間隔,即每隔多長時間保存一次當前RDD的信息。

 

chekPoint 是spark streaming 內部自動機制,根據流程圖中是否有使用到【須要狀態保存點的轉換函數】來自動保存某個點的狀態信息及計算結果

checkpointing saves an RDD's data and its complete DAG(an RDD's calculation plan),當一個executor異常,RDD不須要從頭開始計算,能夠從硬盤

中讀取存儲的結果

 

【須要考慮過去的計算狀態的方法】spark主要提供了兩個updateStateByKey 和mapWithState  這兩個方法的使用者必須是PairDStream對象,

即(key,value)形式的DStream。 使用updateStateByKey時,state會隨每個新batch變化而變化

 

 

http://spark.apache.org/docs/2.3.3/streaming-programming-guide.html#checkpointing

若是須要應用從異常中進行故障恢復 , 就須要改進 streaming 應用,使其 實現了以下邏輯

  • 當應用是第一個被開啓, 則它會建立一個新的 StreamingContext ,來啓動全部的streams 而後call start()
  • 當應用是在故障後被重啓 ,則它會根據 checkpoint目錄中的checkpoint 數據來從新建立一個StreamingContext

實現邏輯  ,使用StreamingContext.getOrCreate

  Python

def functionToCreateContext():

  sc = SparkContext(...)

  ssc = StreamingContext(...)

  lines = ssc.socketTextStream(...)

  ...

  ssc.checkpoint(checkpointDirectory)   #設置 checkpoint 目錄

  return ssc 

context = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext)

#context的其餘的操做

context.  ...

 

context.start()

context.awaitTermination()

 

若是 checkpointDirectory 存在 ,context就會被根據checkpoint data從新建立

若是不存在(好比說第一次運行),方法 functionToCreateContext就會建立一個新的context 而後啓動DStreams          

python wordCount故障恢復示例

https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py

 

另外 要使用getOrCreate的特性,還須要讓driver進程在失敗時能自動重啓。這隻能經過運行這個application 的部署基礎設施來實現

 

RDD的checkpointing會致使性能消耗用於 保存到穩定的存儲系統 。 會致使執行了 checkpoint的那些batches處理時間變成 。

所以 checkpointing的時間間隔須要仔細設置 ,對於短期間隔的batch(好比1秒),在每一個batch都進行checkpointing會嚴重減小處理的數據量

checkpoint太頻繁會致使流程線延長和task size增加 。

對於須要rdd checkpointing 的狀態性的transformations ,默認的check 間隔是batch間隔的倍數,至少爲10秒。

能夠經過 dstream.checkpoint(checkpointInterval)進行設置 ,將checkpoint間隔設置爲sliding間隔的5-10倍是比較好的作法

 

Accumulators 累加器 ,Broadcast Variables 廣播變量 , checkpoints 檢查點

 

Accumulators ,Broadcast variables 並不能從spark streaming的checkpoint中恢復,因此若是你開啓了checkpointing 而且使用了Accumulators或Broadcast variables ,

就須要建立一個 惰性的可實例化單個實例的方法,是它們能夠在driver重啓時,從新實例化

def getWordBlacklist(sparkContext): if ("wordBlacklist" not in globals()): globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"]) return globals()["wordBlacklist"] def getDroppedWordsCounter(sparkContext): if ("droppedWordsCounter" not in globals()): globals()["droppedWordsCounter"] = sparkContext.accumulator(0) return globals()["droppedWordsCounter"] def echo(time, rdd): # Get or register the blacklist Broadcast blacklist = getWordBlacklist(rdd.context) # Get or register the droppedWordsCounter Accumulator droppedWordsCounter = getDroppedWordsCounter(rdd.context) # Use blacklist to drop words and use droppedWordsCounter to count them def filterFunc(wordCount): if wordCount[0] in blacklist.value: droppedWordsCounter.add(wordCount[1]) False else: True counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) wordCounts.foreachRDD(echo)










window窗口函數(scala版)
window的操做做用於一系列的mini-batches 上進行滑動,
每一個window DStream 決定因素 = ‘window的容量duration持續時間’+ ‘window的滑動間隔(即一個窗口的數據計算的頻度)’
這連個參數都是mini-batches的倍數
要統計每一個窗口中每種交易量的前5名能夠使用window方法,若是wnidow的mini-batch爲5分鐘,則表示 每5分鐘統計一次近60分鐘內交易量最大的前5名
先計算近1小時內每一個類型的總交易量
val stocksPerWindow = orders.map(x => (x.symbol, x.amount)).window(Minutes(60)).reduceByKey((a1:Int,a2:Int) => a1+a2)
再取前5名
val topStocks = stocksPerWindow.transform(_.sortBy(_._2,false).map(_._1).zipWithIndex.filter(x => x._2 < 5)).repartition(1).
map(x=>x._1.toString).glom().map(arr=>("TOP5STOCKS",arr.toList))
val finalStream = buySellList.union(top5clList).union(topStocks)
finalStream.repartition(1).saveAsTextFiles("/home/spark/ch06output/output","txt")
sc.setCheckpointDir("/home/spark/checkpoint/")
ssc.start()
window相關函數不須要設置checkpoint ,通常的DStream均可以使用,有些須要(k,v)類型的pair DStream才能使用

相關函數以下
window(winDur,[slideDur])

 函數

 說明
 window(winDur,[slideDur])   每隔slideDur時間段,就對windowDur中的RDD計算一次, winDur- 窗口的大小;slideDur窗口滑動的間隔,默認爲mini-batch(一個DStream的時間間隔)
 countByWindow(winDur,slideDur) 統計當前windowDur中全部RDD包含的元素個數 ,winDur- 窗口的大小;slideDur窗口滑動的間隔
 countByValueAndWindow(winDur,slideDur,[numParts]) 統計當前窗口中distinct元素(無重複元素)的個數,numParts用於修改當前DStream的默認分區數 
 reduceByWindow(reduceFunc,winDur,slideDur) 對window中全部RDD的元素應用 reduceFunc方法 
reduceByWindow(reduceFunc,invReduceFunc,winDur,slideDur) 更高效的reduceByWindow方法,對每一個window中全部RDD的元素應用reduceFunc方法;去除不屬於當前window的rdd元素,使用方法invReduceFunc
groupByKeyAndWindow(winDur,[slideDur],[numParts/partitioner])    根據key 對window中全部元素進行分組,slideDur可選參數,numParts設置
相關文章
相關標籤/搜索