前面的文章已經介紹了在spark streaming集成kafka時,如何處理其偏移量的問題,因爲spark streaming自帶的checkpoint弊端很是明顯,因此一些對數據一致性要求比較高的項目裏面,不建議採用其自帶的checkpoint來作故障恢復。git
在spark streaming1.3以後的版本支持direct kafka stream,這種策略更加完善,放棄了原來使用Kafka的高級API自動保存數據的偏移量,以後的版本採用Simple API也就是更加偏底層的api,咱們既能夠用checkpoint來容災,也能夠經過低級api來獲取偏移量本身管理偏移量,這樣以來不管是程序升級,仍是故障重啓,在框架端均可以作到Exact One準確一次的語義。github
本篇文章,會再介紹下,如何手動管理kafka的offset,並給出具體的代碼加以分析:apache
版本:api
apache spark streaming2.1框架
apache kafka 0.9.0.0spa
手動管理offset的注意點:debug
(1)第一次項目啓動的時候,由於zk裏面沒有偏移量,因此使用KafkaUtils直接建立InputStream,默認是從最新的偏移量開始消費,這一點能夠控制。code
(2)若是非第一次啓動,zk裏面已經存在偏移量,因此咱們讀取zk的偏移量,並把它傳入到KafkaUtils中,從上次結束時的偏移量開始消費處理。字符串
(3)在foreachRDD裏面,對每個批次的數據處理以後,再次更新存在zk裏面的偏移量get
注意上面的3個步驟,1和2只會加載一次,第3個步驟是每一個批次裏面都會執行一次。
下面看第一和第二個步驟的核心代碼:
/**** * * @param ssc StreamingContext * @param kafkaParams 配置kafka的參數 * @param zkClient zk鏈接的client * @param zkOffsetPath zk裏面偏移量的路徑 * @param topics 須要處理的topic * @return InputDStream[(String, String)] 返回輸入流 */ def createKafkaStream(ssc: StreamingContext, kafkaParams: Map[String, String], zkClient: ZkClient, zkOffsetPath: String, topics: Set[String]): InputDStream[(String, String)]={ //目前僅支持一個topic的偏移量處理,讀取zk裏面偏移量字符串 val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last) val kafkaStream = zkOffsetData match { case None => //若是從zk裏面沒有讀到偏移量,就說明是系統第一次啓動 log.info("系統第一次啓動,沒有讀取到偏移量,默認就最新的offset開始消費") //使用最新的偏移量建立DirectStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) case Some(lastStopOffset) => log.info("從zk中讀取到偏移量,從上次的偏移量開始消費數據......") val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) //使用上次中止時候的偏移量建立DirectStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler) } kafkaStream//返回建立的kafkaStream }
主要是針對第一次啓動,和非首次啓動作了不一樣的處理。
而後看下第三個步驟的代碼:
/**** * 保存每一個批次的rdd的offset到zk中 * @param zkClient zk鏈接的client * @param zkOffsetPath 偏移量路徑 * @param rdd 每一個批次的rdd */ def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = { //轉換rdd爲Array[OffsetRange] val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //轉換每一個OffsetRange爲存儲到zk時的字符串格式 : 分區序號1:偏移量1,分區序號2:偏移量2,...... val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",") log.debug(" 保存的偏移量: "+offsetsRangesStr) //將最終的字符串結果保存到zk裏面 ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr) }
主要是更新每一個批次的偏移量到zk中。
例子已經上傳到github中,有興趣的同窗能夠參考這個連接:
https://github.com/qindongliang/streaming-offset-to-zk
後續文章會聊一下爲了升級應用如何優雅的關閉的流程序,以及在kafka擴展分區時,上面的程序如何自動兼容。