Kafka Offset管理及語義概念的理解

引語


  消費者須要本身保留一個offset,從kafka 獲取消息時,只拉去當前offset 之後的消息。 kafka offset的管理方式分爲兩種保存offset和不保存offset,通常保存offset採用的是外部存儲保護,這都要根據具體的業務狀況來定。使用外部存儲保存,咱們可把offset保存到Checkpoint, Hbase, Zookeeper, Kafka,接下來咱們就來offset保存的方式,各類方式使用的場景,關於至少一次語義,最多一次語義以及一次僅一次語義的一些相關概念,以及解決至少一次語義存在的數據重複和之多一次語義存在的數據丟失問題的方法。
Kafka Offset管理-Checkpoint
  一、啓用Spark Streaming的checkpoint是存儲偏移量最簡單的方法mysql

  二、流式checkpoint專門用於保存應用程序的狀態,好比保存在HDFS上,在故障時能恢復sql

  三、Spark Streaming的checkpoint沒法跨越應用程序進行恢復(這個機器上保存的offset,想在另一臺機器上恢復這個offset)服務器

  四、Spark升級也將致使沒法恢復(升級(API、版本迭代、邏輯修改)後會致使之前的checkpoint沒法使用)socket

  五、在關鍵生產應用,不建議使用spark檢查點管理offsetspa

Kafka Offset管理-Zookeeper


  receiver會自動將offset維護到zookeeper中。這裏的主要講的是用Direct方式,手動地將offset的值維護到zookeeper中。設計

  一、路徑    val zkPath = s"{kafkaOffsetRootPath}/{groupName}/{o.partition}/{o.partition}"對象

  二、若是Zookeeper中未保存offset,根據kafkaParam的配置使用最新或者最舊的offsetblog

  三、若是Zookeeper中有保存offset,咱們會利用這個offset做爲kafka的起始位置接口

Kafka Offset管理-Hbase


  一、基於Hbase的通用設計,使用同一張表保存能夠跨越多個spark Streaming程序的topic的offset事務

  二、rowkey = topic名稱 + groupID + Streaming的batchtime.milliSeconds.儘管batchtime.milliSeconds不是必須的,可是它能夠看到歷史的批處理任務對offset的管理狀況。

  三、kafka的offset保存在下面的表中,列簇爲offsets, 30天后自動過時Hbase表結構 create spark_kafka_offsets,{NAME => offset, TTL => 2592000}

  四、offset的獲取場景

    場景1:Streaming做業首次啓動。經過zookeeper來查找給定topic中分區的數量,而後返回「0」做爲全部topic分區的offset

    場景2:長時間運行的Streaming做業已經中止,新的分區被添加到kafka的topic中。經過zookeeper來查找給定topic中分區數量,對於全部舊的topic分區,將offset設置爲Hbase中的最新偏移量。對於全部新的topic分區,它將返回「0」做爲offset

    場景3:長時間運行的Streaming做業已經中止,topic的分區沒有任何更改。在這種狀況下,Hbase中發現的最新偏移量做爲每一個topic分區的offset返回。

Spark Streaming消費數據反寫kafka
  實現流程:

    一、flume將socket流數據採集到kafka

    二、Streaming讀取kafka的數據進行清洗

    三、將清洗後的數據再次寫到kafka

推薦方式


  一、將kafkaProducer對象廣播到全部executor節點,這樣就能夠在每一個executor節點將數據插入到kafka

  二、用partition的方式,一個rdd的partition對應一個kafkaProducer

生產環境中存在問題分析


  kafka的保存offset過時問題(也稱offset越界問題)

  緣由:segment過時致使offset在實際數據的offset以前(segment過時致使數據不存在了,可是在其餘地方還存在offset,當再次消費這個數據取出offset的時候就會出現數據找不到問題)

  解決方法:實現手動解決offset越界問題,須要把kafkaCluster類的內容拿過來,而且把包訪問權限去掉,具體實現查看MyCluster

數據峯值期間如何限速
  場景:Streaming宕機一段時間或數據峯值期間都會形成kafka數據積壓,若是不對Streaming的批次間隔作限速處理,在批次數據中會拉取不少的數據,這樣會影響處理效率。

  解決辦法:進行限速。限速參數:spark.streaming.kafka.maxRatePartition 每秒每一個分區獲取的記錄數

kafka的消息傳遞語義
  消息傳遞語義有:至少一次語義(at-least-once)、最多一次語義(at-most-once)、一次僅一次語義(exactly-once)。其中at-least-once和at-most-once,它們的使用會存在數據重複和數據丟失問題,可能出現這種狀況的緣由圖解以下:

exactly-once


  因爲,至少一次語義會致使數據重複,最多一次語義會致使數據的丟失,因此提出了一次僅一次語義,就能夠很好地解決了數據重複和數據丟失問題。那麼怎麼來實現一次僅一次語義?小編總結總結以下:

  一、冪等寫入:當獲取到數據後,先寫到MySQL,再保存offset,若是在寫到MySQL數據後,在保存offset以前宕機,重啓做業後也不會影響一次語義,由於會在MySQL重複更新須要設置好惟一的主鍵。好比Redis、MySQL,再好比每次往一個目錄覆寫數據,這樣主鍵不容易獲取

注:在軟件開發領域,冪等寫入即爲一樣的請求被執行一次與連續執行屢次的效果是同樣的,服務器的狀態也是同樣的,實際上就是接口的可重複調用(包括時間和空間的兩個維度)

  二、事物控制:保證數據和offset在同一個事務裏面,好比用mysql這樣須要事務存儲的支持。

  三、自定義實現:offset和數據綁定保存等

總結


        在offset的管理當中,使用checkepoint的方式來管理offset簡單易實現,可是若是進行程序迭代後其餘應用程序是獲取不到的,所以在實際生產環境中若是Streaming流式處理的複雜度不高,處理的數據比較小可使用checkpoint來進行offset的管理。通常狀況下,推薦使用zookeeper來進行offset的管理,儘管使用起來比checkpoint複雜,可是這種方式適用於比較複雜的Streaming,當機器升級或者應用程序改變時,其餘程序任然能夠獲取到zookeeper中的offset值。


更新時間
第一次更新時間:2018-12-09    增長了總結

相關文章
相關標籤/搜索