Spark Streaming 之 Kafka 偏移量管理

本文主要介紹 Spark Streaming 應用開發中消費 Kafka 消息的相關內容,文章着重突出了開發環境的配置以及手動管理 Kafka 偏移量的實現。java

1、開發環境

一、組件版本

  • CDH 集羣版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1

二、Maven 依賴

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基礎依賴 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相關依賴 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相關依賴 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相關依賴 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>
複製代碼

三、scala 編譯

在 pom.xml 的 build 節點下的 plugins 中添加 scala 編譯插件apache

<plugin>
    <groupId>org.scala-tools</groupId>
    <artifactId>maven-scala-plugin</artifactId>
    <executions>
      <execution>
        <goals>
          <goal>compile</goal>
          <goal>testCompile</goal>
        </goals>
      </execution>
    </executions>
    <configuration>
      <scalaVersion>${scala.version}</scalaVersion>
      <args>
        <arg>-target:jvm-1.5</arg>
      </args>
    </configuration>
</plugin>
複製代碼

Maven 打包語句:mvn clean scala:compile compile package編程

四、打包注意事項

因爲 spark、spark-streaming、zookeeper 等均爲大數據集羣中必備的組件,所以與之相關的依賴無需打包到最終的 jar 包中,能夠將其 scope 設置爲 provided 便可;不然最終的 jar 包會至關龐大。bootstrap

2、Kafka 偏移量

一、偏移量(offset)

這裏的偏移量是指 kafka consumer offset,在 Kafka 0.9 版本以前消費者偏移量默認被保存在 zookeeper 中(/consumers/<group.id>/offsets/<topic>/<partitionId>),所以在初始化消費者的時候須要指定 zookeeper.hostsbash

隨着 Kafka consumer 在實際場景的不斷應用,社區發現舊版本 consumer 把位移提交到 ZooKeeper 的作法並不合適。ZooKeeper 本質上只是一個協調服務組件,它並不適合做爲位移信息的存儲組件,畢竟頻繁高併發的讀/寫操做並非 ZooKeeper 擅長的事情。所以在 0.9 版本開始 consumer 將位移提交到 Kafka 的一個內部 topic(__consumer_offsets)中,該主題默認有 50 個分區,每一個分區 3 個副本。併發

二、消息交付語義

  • at-most-once:最多一次,消息可能丟失,但不會被重複處理;
  • at-least-once:至少一次,消息不會丟失,但可能被處理屢次;
  • exactly-once:精確一次,消息必定會被處理且只會被處理一次。

若 consumer 在消息消費以前就提交位移,那麼即可以實現 at-most-once,由於若 consumer 在提交位移與消息消費之間崩潰,則 consumer 重啓後會重新的 offset 位置開始消費,前面的那條消息就丟失了;相反地,若提交位移在消息消費以後,則可實現 at-least-once 語義。因爲 Kafka 沒有辦法保證這兩步操做能夠在同一個事務中完成,所以 Kafka 默認提供的就是 at-least-once 的處理語義。app

三、offset 提交方式

默認狀況下,consumer 是自動提交位移的,自動提交間隔是 5 秒,能夠經過設置 auto.commit.interval.ms 參數能夠控制自動提交的間隔。自動位移提交的優點是下降了用戶的開發成本使得用戶沒必要親自處理位移提交;劣勢是用戶不能細粒度地處理位移的提交,特別是在有較強的精確一次處理語義時(在這種狀況下,用戶可使用手動位移提交)。jvm

所謂的手動位移提交就是用戶自行肯定消息什麼時候被真正處理完並能夠提交位移,用戶能夠確保只有消息被真正處理完成後再提交位移。若是使用自動位移提交則沒法保證這種時序性,所以在這種狀況下必須使用手動提交位移。設置使用手動提交位移很是簡單,僅僅須要在構建 KafkaConsumer 時設置 enable.auto.commit=false,而後調用 commitSync 或 commitAsync 方法便可。maven

3、使用 Zookeeper 管理 Kafka 偏移量

一、Zookeeper 管理偏移量的優點

雖說新版 kafka 中已經無需使用 zookeeper 管理偏移量了,可是使用 zookeeper 管理偏移量相比 kafka 自行管理偏移量有以下幾點好處:ide

  1. 可使用 zookeeper 管理工具輕鬆查看 offset 信息;
  2. 無需修改 groupId 便可從頭讀取消息;
  3. 特別狀況下能夠人爲修改 offset 信息。

藉助 zookeeper 管理工具能夠對任何一個節點的信息進行修改、刪除,若是但願從最開始讀取消息,則只須要刪除 zk 某個節點的數據便可。

二、Zookeeper 偏移量管理實現

import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.OffsetRange

import scala.collection.JavaConverters._

class ZkKafkaOffset(getClient: () => ZkClient, getZkRoot : () => String) {

  // 定義爲 lazy 實現了懶漢式的單例模式,解決了序列化問題,方便使用 broadcast
  lazy val zkClient: ZkClient = getClient()
  lazy val zkRoot: String = getZkRoot()

  // offsetId = md5(groupId+join(topics))
  // 初始化偏移量的 zk 存儲路徑 zkRoot
  def initOffset(offsetId: String) : Unit = {
    if(!zkClient.exists(zkRoot)){
      zkClient.createPersistent(zkRoot, true)
    }
  }

  // 從 zkRoot 讀取偏移量信息
  def getOffset(): Map[TopicPartition, Long] = {
    val keys = zkClient.getChildren(zkRoot)
    var initOffsetMap: Map[TopicPartition, Long] = Map()
    if(!keys.isEmpty){
      for (k:String <- keys.asScala) {
        val ks = k.split("!")
        val value:Long = zkClient.readData(zkRoot + "/" + k)
        initOffsetMap += (new TopicPartition(ks(0), Integer.parseInt(ks(1))) -> value)
      }
    }
    initOffsetMap
  }

  // 根據單條消息,更新偏移量信息
  def updateOffset(consumeRecord: ConsumerRecord[String, String]): Boolean = {
    val path = zkRoot + "/" + consumeRecord.topic + "!" + consumeRecord.partition
    zkClient.writeData(path, consumeRecord.offset())
    true
  }

  // 消費消息前,批量更新偏移量信息
  def updateOffset(offsetRanges: Array[OffsetRange]): Boolean = {
    for (offset: OffsetRange <- offsetRanges) {
      val path = zkRoot + "/" + offset.topic + "!" + offset.partition
      if(!zkClient.exists(path)){
        zkClient.createPersistent(path, offset.fromOffset)
      }
      else{
        zkClient.writeData(path, offset.fromOffset)
      }
    }
    true
  }

  // 消費消息後,批量提交偏移量信息
  def commitOffset(offsetRanges: Array[OffsetRange]): Boolean = {
    for (offset: OffsetRange <- offsetRanges) {
      val path = zkRoot + "/" + offset.topic + "!" + offset.partition
      if(!zkClient.exists(path)){
        zkClient.createPersistent(path, offset.untilOffset)
      }
      else{
        zkClient.writeData(path, offset.untilOffset)
      }
    }
    true
  }

  def finalize(): Unit = {
    zkClient.close()
  }
}

object ZkKafkaOffset{
  def apply(cong: SparkConf, offsetId: String): ZkKafkaOffset = {
    val getClient = () =>{
      val zkHost = cong.get("kafka.zk.hosts", "127.0.0.1:2181")
      new ZkClient(zkHost, 30000)
    }
    val getZkRoot = () =>{
      val zkRoot = "/kafka/ss/offset/" + offsetId
      zkRoot
    }
    new ZkKafkaOffset(getClient, getZkRoot)
  }
}
複製代碼

三、Spark Streaming 消費 Kafka 消息

import scala.collection.JavaConverters._

object RtDataLoader {
  def main(args: Array[String]): Unit = {
    // 從配置文件讀取 kafka 配置信息
    val props = new Props("xxx.properties")
    val groupId = props.getStr("groupId", "")
    if(StrUtil.isBlank(groupId)){
      StaticLog.error("groupId is empty")
      return
    }
    val kfkServers = props.getStr("kfk_servers")
    if(StrUtil.isBlank(kfkServers)){
      StaticLog.error("bootstrap.servers is empty")
      return
    }
    val topicStr = props.getStr("topics")
    if(StrUtil.isBlank(kfkServers)){
      StaticLog.error("topics is empty")
      return
    }

    // KAFKA 配置設定
    val topics = topicStr.split(",")
    val kafkaConf = Map[String, Object](
      "bootstrap.servers" -> kfkServers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "receive.buffer.bytes" -> (102400: java.lang.Integer),
      "max.partition.fetch.bytes" -> (5252880: java.lang.Integer),
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val conf = new SparkConf().setAppName("ss-kafka").setIfMissing("spark.master", "local[2]")

    // streaming 相關配置
    conf.set("spark.streaming.stopGracefullyOnShutdown","true")
    conf.set("spark.streaming.backpressure.enabled","true")
    conf.set("spark.streaming.backpressure.initialRate","1000")

    // 設置 zookeeper 鏈接信息
    conf.set("kafka.zk.hosts", props.getStr("zk_hosts", "sky-01:2181"))

    // 建立 StreamingContext
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))

    // 根據 groupId 和 topics 獲取 offset
    val offsetId = SecureUtil.md5(groupId + topics.mkString(","))
    val kafkaOffset = ZkKafkaOffset(ssc.sparkContext.getConf, offsetId)
    kafkaOffset.initOffset(ssc, offsetId)
    val customOffset: Map[TopicPartition, Long] = kafkaOffset.getOffset(ssc)

    // 建立數據流
    var stream:InputDStream[ConsumerRecord[String, String]] = null
    if(topicStr.contains("*")) {
      StaticLog.warn("使用正則匹配讀取 kafka 主題:" + topicStr)
      stream = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))
    }
    else {
      StaticLog.warn("待讀取的 kafka 主題:" + topicStr)
      stream = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, customOffset))
    }

    // 消費數據
    stream.foreachRDD(rdd => {
      // 消息消費前,更新 offset 信息
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      kafkaOffset.updateOffset(offsetRanges)
      
      //region 處理詳情數據
      StaticLog.info("開始處理 RDD 數據!")
      //endregion
      
      // 消息消費結束,提交 offset 信息
      kafkaOffset.commitOffset(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
複製代碼

四、注意事項

auto.offset.reset

對於 auto.offset.reset 我的推薦設置爲 earliest,初次運行的時候,因爲 __consumer_offsets 沒有相關偏移量信息,所以消息會從最開始的地方讀取;當第二次運行時,因爲 __consumer_offsets 已經存在消費的 offset 信息,所以會根據 __consumer_offsets 中記錄的偏移信息繼續讀取數據。

此外,對於使用 zookeeper 管理偏移量而言,只須要刪除對應的節點,數據便可從頭讀取,也是很是方便。不過若是你但願從最新的地方讀取數據,不須要讀取舊消息,則能夠設置爲 latest。

基於正則訂閱 Kafka 主題

基於正則訂閱主題,有如下好處:

  • 無需羅列主題名,一兩個主題還好,若是有幾十個,羅列過於麻煩了;
  • 可實現動態訂閱的效果(新增的符合正則的主題也會被讀取)。
stream = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))
複製代碼

SparkStreaming 序列化問題

開發 SparkStreaming 程序的每一個人都會遇到各類各樣的序列化問題,簡單來講:在 driver 中使用到的變量或者對象無需序列化,傳遞到 exector 中的變量或者對象須要序列化。所以推薦的作法是,在 exector 中最好只處理數據的轉換,在 driver 中對處理的結果進行存儲等操做。

stream.foreachRDD(rdd => {
  // driver 代碼運行區域
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaOffset.updateOffset(offsetRanges)
  
  // exector 代碼運行區域
  val resultRDD = rdd.map(xxxxxxxx)
  //endregion
  
  //對結果進行存儲
  resultRDD.saveToES(xxxxxx)
  kafkaOffset.commitOffset(offsetRanges)
})
複製代碼

文中部分概念摘自《Kafka 實戰》,一本很是棒的書籍,推薦一下。


Any Code,Code Any!

掃碼關注『AnyCode』,編程路上,一塊兒前行。

相關文章
相關標籤/搜索