如何管理Spark Streaming消費Kafka的偏移量(三)

前面的文章已經介紹了在spark streaming集成kafka時,如何處理其偏移量的問題,因爲spark streaming自帶的checkpoint弊端很是明顯,因此一些對數據一致性要求比較高的項目裏面,不建議採用其自帶的checkpoint來作故障恢復。git

在spark streaming1.3以後的版本支持direct kafka stream,這種策略更加完善,放棄了原來使用Kafka的高級API自動保存數據的偏移量,以後的版本採用Simple API也就是更加偏底層的api,咱們既能夠用checkpoint來容災,也能夠經過低級api來獲取偏移量本身管理偏移量,這樣以來不管是程序升級,仍是故障重啓,在框架端均可以作到Exact One準確一次的語義。github

本篇文章,會再介紹下,如何手動管理kafka的offset,並給出具體的代碼加以分析:apache

版本:api

apache spark streaming2.1框架

apache kafka 0.9.0.0spa

手動管理offset的注意點:debug

(1)第一次項目啓動的時候,由於zk裏面沒有偏移量,因此使用KafkaUtils直接建立InputStream,默認是從最新的偏移量開始消費,這一點能夠控制。code

(2)若是非第一次啓動,zk裏面已經存在偏移量,因此咱們讀取zk的偏移量,並把它傳入到KafkaUtils中,從上次結束時的偏移量開始消費處理。字符串

(3)在foreachRDD裏面,對每個批次的數據處理以後,再次更新存在zk裏面的偏移量get

注意上面的3個步驟,1和2只會加載一次,第3個步驟是每一個批次裏面都會執行一次。

下面看第一和第二個步驟的核心代碼:

/****
    *
    * @param ssc  StreamingContext
    * @param kafkaParams  配置kafka的參數
    * @param zkClient  zk鏈接的client
    * @param zkOffsetPath zk裏面偏移量的路徑
    * @param topics     須要處理的topic
    * @return   InputDStream[(String, String)] 返回輸入流
    */
  def createKafkaStream(ssc: StreamingContext,
                        kafkaParams: Map[String, String],
                        zkClient: ZkClient,
                        zkOffsetPath: String,
                        topics: Set[String]): InputDStream[(String, String)]={
    //目前僅支持一個topic的偏移量處理,讀取zk裏面偏移量字符串
    val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last)

    val kafkaStream = zkOffsetData match {
      case None =>  //若是從zk裏面沒有讀到偏移量,就說明是系統第一次啓動
        log.info("系統第一次啓動,沒有讀取到偏移量,默認就最新的offset開始消費")
        //使用最新的偏移量建立DirectStream
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
      case Some(lastStopOffset) =>
        log.info("從zk中讀取到偏移量,從上次的偏移量開始消費數據......")
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
        //使用上次中止時候的偏移量建立DirectStream
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler)
    }
    kafkaStream//返回建立的kafkaStream
  }

主要是針對第一次啓動,和非首次啓動作了不一樣的處理。

而後看下第三個步驟的代碼:

/****
    * 保存每一個批次的rdd的offset到zk中
    * @param zkClient zk鏈接的client
    * @param zkOffsetPath   偏移量路徑
    * @param rdd     每一個批次的rdd
    */
  def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = {
    //轉換rdd爲Array[OffsetRange]
    val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //轉換每一個OffsetRange爲存儲到zk時的字符串格式 :  分區序號1:偏移量1,分區序號2:偏移量2,......
    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")
    log.debug(" 保存的偏移量:  "+offsetsRangesStr)
    //將最終的字符串結果保存到zk裏面
    ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr)
  }

主要是更新每一個批次的偏移量到zk中。

例子已經上傳到github中,有興趣的同窗能夠參考這個連接:

https://github.com/qindongliang/streaming-offset-to-zk

後續文章會聊一下爲了升級應用如何優雅的關閉的流程序,以及在kafka擴展分區時,上面的程序如何自動兼容。

相關文章
相關標籤/搜索