Spark流式狀態管理(updateStateByKey、mapWithState等)

一般使用Spark的流式框架如Spark Streaming,作無狀態的流式計算是很是方便的,僅需處理每一個批次時間間隔內的數據便可,不須要關注以前的數據,這是創建在業務需求對批次之間的數據沒有聯繫的基礎之上的。redis

但若是咱們要跨批次作一些數據統計,好比batch是3秒,但要統計每1分鐘的用戶行爲,那麼就要在整個流式鏈條中維護一個狀態來保存近1分鐘的用戶行爲。apache

那麼若是維護這樣一個狀態呢?通常狀況下,主要經過如下幾種方式:微信

1. spark內置算子:updateStateByKey、mapWithState框架

2. 第三方存儲系統維護狀態:如redis、alluxio、HBase這裏主要以spark內置算子:updateStateByKey、mapWithState爲例,經過一些示例代碼(不涉及offset管理),來看看如何進行狀態維護。函數

 

updateStateByKey性能

分析相關源碼發現,這個算子的核心思想就是將以前有狀態的RDD和當前的RDD作一次cogroup,獲得一個新的狀態的RDD,具備以下特色:學習

1. 能夠設置初始狀態大數據

2. key超時刪除。用updatefunc返回None值。updateFunc無論是否有已保存狀態key的新數據到來,都會被已存在狀態的key調用,新增的key也會調用3. 不適合大數據量狀態存儲,尤爲是key的維度比較高、value狀態比較大的spa

/**
* @author:微信公衆號:大數據學習與分享
*/
object StateOperator {

  private val brokers = "kafka-1:9092,kafka-2:9092,kafka-3:9092"
  private val topics = "test"
  private val groupId = "test"
  private val batchTime = "10"
  private val mapwithstateCKDir = "/mapwithstate"
  private val updateStateByKeyCKDir = "/mapwithstate"

  def main(args: Array[String]): Unit = {
    
    val ssc = StreamingContext.getOrCreate(updateStateByKeyCKDir, () => createContext(brokers, topics, groupId, batchTime, updateStateByKeyCKDir))

    ssc.start()
    ssc.awaitTermination()
  }

  def createContext(brokers: String, topics: String,
                    groupId: String, batchTime: String,
                    checkpointDirectory: String): StreamingContext = {

    val conf = new SparkConf().setAppName("testState").setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "5")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val ssc = new StreamingContext(conf, Seconds(batchTime.toInt))

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
      "group.id" -> groupId,
      "auto.offset.reset" -> "smallest")

    val streams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
      .map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    ssc.checkpoint("/redis/updateStateByKey")

    val initialRDD = ssc.sparkContext.parallelize(List(("word", 0)))

    //updateStateByKey 底層核心是對preStateRDD(以前數據狀態的RDD)和當前批次的RDD進行cogroup
    val stateStreams = streams.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)

    stateStreams.checkpoint(Duration(5))

    stateStreams.foreachRDD { rdd =>
      val res = rdd.map { case (word, count) => (count, word) }.sortByKey(false).take(10).map { case (v, k) => (k, v) }
      res.foreach(println)
    }

    ssc.checkpoint(checkpointDirectory)
    ssc
  }

  //不管當前批次RDD有多少key(好比preStateRDD有而當前批次沒有)都須要對全部的數據進行cogroup並調用一次定義的updateFunc函數
  val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
    iterator.flatMap(t => Some(t._2.sum + t._3.getOrElse(0)).map(v => (t._1, v)))
  }

}

 

經過updateStateByKey得到的是整個狀態的數據,並且每次狀態更新時都要將當前批次過來的數據與以前保存的狀態進行cogroup操做,而且對全部數據都調用自定義的函數進行一次計算。code

隨着時間推移,數據量不斷增加,須要維護的狀態愈來愈大,會很是影響性能。若是不能在當前批次將數據處理完成,很容易形成數據堆積,影響程序穩定運行甚至宕掉,這就引出了mapWithState。

 

mapWithState

支持輸出全量的狀態和更新的狀態,還支持對狀態超時管理,用戶能夠根據業務需求選擇須要的輸出,性能優於於updateStateByKey。

def main(args: Array[String]): Unit = {
    //單詞統計
    val ssc = StreamingContext.getOrCreate(mapwithstateCKDir,
     () => createContext(brokers, topics, groupId, batchTime, mapwithstateCKDir))

    ssc.start()
    ssc.awaitTermination()
}

def createContext(brokers: String, topics: String,
                 groupId: String, batchTime: String,
                 checkpointDirectory: String): StreamingContext = {

 val conf = new SparkConf().setAppName("testState").setMaster("local[*]")
   .set("spark.streaming.kafka.maxRatePerPartition", "5")
   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 val ssc = new StreamingContext(conf, Seconds(batchTime.toInt))

 val topicsSet = topics.split(",").toSet
 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
   "group.id" -> groupId,
   "auto.offset.reset" -> "smallest")

 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
      .map(_._2).flatMap(_.split(" ")).map((_, 1L)).reduceByKey(_ + _)

   val stateStreams = messages.mapWithState(StateSpec.function(mapFunc).timeout(Seconds(60))).stateSnapshots()
   //.checkpoint(Duration(5))

   stateStreams.foreachRDD { (rdd, time) =>
     println("========do something")
   }

   ssc.checkpoint(checkpointDirectory)
   ssc
 }

 //key爲word,value爲當前批次值,state爲本批次以前的狀態值
 val mapFunc = (key: String, value: Option[Long], state: State[Long]) => {
   //檢測是否過時
   if (state.isTimingOut()) {
     println(s"$key is timing out")
   } else {
     val sum = state.getOption().getOrElse(0L) + value.getOrElse(0L)
     val output = (key, sum)
     //更新狀態
     state.update(sum)
     output
   }
 }
 
 val mapFunction = (time: Time, word: String, count: Option[Int], state: State[Int]) => {
   val sum = count.getOrElse(0) + state.getOption().getOrElse(0)
   val output = (word, sum)
   state.update(sum)
   Option(output)
 }

 

雖然mapWithState相對於updateStateByKey性能更優,但仍然不適合大數據量的狀態維護,此時就須要借用第三方存儲來進行狀態的維護了,redis、alluxio、HBase是經常使用的選擇。

redis比較適合維護key具備超時處理機制的場景使用;alluxio的吞吐量更高,適合於數據量更大時的場景處理。

具體採用哪一種方式,要結合實際的業務場景、數據量、性能等多方面的考量。


 

關注微信公衆號:大數據學習與分享,獲取更對技術乾貨

相關文章
相關標籤/搜索