Kafka 日誌存儲

在進行詳解以前,我想先聲明一下,本次咱們進行講解說明的是 Kafka 消息存儲的信息文件內容,不是所謂的 Kafka 服務器運行產生的日誌文件,這一點但願你們清楚。shell

Kafka 消息是以主題爲單位進行歸類,各個主題之間是彼此獨立的,互不影響。每一個主題又能夠分爲一個或多個分區。每一個分區各自存在一個記錄消息數據的日誌文件。也就是該文要着重關注的內容。咱們根據以下的圖進行進一步說明:服務器

圖片描述

圖中,建立了一個 demo-topic 主題,其存在 7 個 Parition,對應的每一個 Parition 下存在一個 [Topic-Parition] 命名的消息日誌文件。在理想狀況下,數據流量分攤到各個 Parition 中,實現了負載均衡的效果。在分區日誌文件中,你會發現不少類型的文件,好比:.index、.timestamp、.log、.snapshot 等,其中,文件名一致的文件集合就稱爲 LogSement。咱們先留有這樣的一個總體的日誌結構概念,接下來咱們一一的進行詳細的說明其中的設計。負載均衡

LogSegment

咱們已經知道分區日誌文件中包含不少的 LogSegment ,Kafka 日誌追加是順序寫入的,LogSegment 能夠減少日誌文件的大小,進行日誌刪除的時候和數據查找的時候能夠快速定位。同時,ActiveLogSegment 也就是活躍的日誌分段擁有文件擁有寫入權限,其他的 LogSegment 只有只讀的權限。spa

日誌文件存在多種後綴文件,重點須要關注 .index、.timestamp、.log 三種類型。其餘的日誌類型功能做用,請查詢下面圖表:線程

類別 做用
.index 偏移量索引文件
.timestamp 時間戳索引文件
.log 日誌文件
.snaphot 快照文件
.deleted
.cleaned 日誌清理時臨時文件
.swap Log Compaction 以後的臨時文件
Leader-epoch-checkpoint

每一個 LogSegment 都有一個基準偏移量,用來表示當前 LogSegment 中第一條消息的 offset。偏移量是一個 64 位的長整形數,固定是20位數字,長度未達到,用 0 進行填補,索引文件和日誌文件都由該做爲文件名命名規則(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特別說明一下,若是日誌文件名爲 00000000000000000121.log ,則當前日誌文件的一條數據偏移量就是 121,偏移量是從 0 開始的。設計

若是想要查看相應文件內容能夠經過 kafka-run-class.sh 腳本查看 .log3d

/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

圖片描述

2.0 中可使用 kafka-dump-log.sh 查 看.index 文件日誌

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index

日誌與索引文件

配置項 默認值 說明
log.index.interval.bytes 4096 (4K) 增長索引項字節間隔密度,會影響索引文件中的區間密度和查詢效率
log.segment.bytes 1073741824 (1G) 日誌文件最大值
log.roll.ms 當前日誌分段中消息的最大時間戳與當前系統的時間戳的差值容許的最大範圍,毫秒維度
log.roll.hours 168 (7天) 當前日誌分段中消息的最大時間戳與當前系統的時間戳的差值容許的最大範圍,小時維度
log.index.size.max.bytes 10485760 (10MB) 觸發偏移量索引文件或時間戳索引文件分段字節限額

偏移量索引文件用於記錄消息偏移量與物理地址之間的映射關係。時間戳索引文件則根據時間戳查找對應的偏移量。code

Kafka 中的索引文件是以稀疏索引的方式構造消息的索引,他並不保證每個消息在索引文件中都有對應的索引項。每當寫入必定量的消息時,偏移量索引文件和時間戳索引文件分別增長一個偏移量索引項和時間戳索引項,經過修改 log.index.interval.bytes 的值,改變索引項的密度。對象

切分文件

從上文中可知,日誌文件和索引文件都會存在多個文件,組成多個 SegmentLog,那麼其切分的規則是怎樣的呢?

當知足以下幾個條件中的其中之一,就會觸發文件的切分:

  1. 當前日誌分段文件的大小超過了 broker 端參數 log.segment.bytes 配置的值。log.segment.bytes 參數的默認值爲 1073741824,即 1GB。
  2. 當前日誌分段中消息的最大時間戳與當前系統的時間戳的差值大於 log.roll.mslog.roll.hours 參數配置的值。若是同時配置了 log.roll.mslog.roll.hours 參數,那麼 log.roll.ms 的優先級高。默認狀況下,只配置了 log.roll.hours 參數,其值爲168,即 7 天。
  3. 偏移量索引文件或時間戳索引文件的大小達到 broker 端參數 log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的默認值爲 10485760,即 10MB。
  4. 追加的消息的偏移量與當前日誌分段的偏移量之間的差值大於 Integer.MAX_VALUE,即要追加的消息的偏移量不能轉變爲相對偏移量。
爲何是 Integer.MAX_VALUE

在偏移量索引文件中,每一個索引項共佔用 8 個字節,並分爲兩部分。相對偏移量和物理地址。

相對偏移量:表示消息相對與基準偏移量的偏移量,佔 4 個字節

物理地址:消息在日誌分段文件中對應的物理位置,也佔 4 個字節

4 個字節恰好對應 Integer.MAX_VALUE ,若是大於 Integer.MAX_VALUE ,則不能用 4 個字節進行表示了。

索引文件切分過程

索引文件會根據 log.index.size.max.bytes 值進行預先分配空間,即文件建立的時候就是最大值,當真正的進行索引文件切分的時候,纔會將其裁剪到實際數據大小的文件。這一點是跟日誌文件有所區別的地方。其意義下降了代碼邏輯的複雜性。

查找消息

offset 查詢

偏移量索引由相對偏移量和物理地址組成。

圖片描述

能夠經過以下命令解析.index 文件

/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
offset:0 position:0
offset:20 position:320
offset:43 position:1220
注意:offset 與 position 沒有直接關係哦,因爲存在數據刪除和日誌清理。

圖片描述

e.g. 如何查看 偏移量爲 23 的消息?

Kafka 中存在一個 ConcurrentSkipListMap 來保存在每一個日誌分段,經過跳躍表方式,定位到在 00000000000000000000.index ,經過二分法在偏移量索引文件中找到不大於 23 的最大索引項,即 offset 20 那欄,而後從日誌分段文件中的物理位置爲320 開始順序查找偏移量爲 23 的消息。

時間戳方式查詢

在上文已經有所說起,經過時間戳方式進行查找消息,須要經過查找時間戳索引和偏移量索引兩個文件。

時間戳索引索引格式

圖片描述

圖片描述

e.g. 查找時間戳爲 1557554753430 開始的消息?

  • 將 1557554753430 和每一個日誌分段中最大時間戳 largestTimeStamp 逐一對比,直到找到不小於 1557554753430 所對應的日誌分段。日誌分段中的 largestTimeStamp 的計算是先查詢該日誌分段所對應時間戳索引文件,找到最後一條索引項,若最後一條索引項的時間戳字段值大於 0 ,則取該值,不然去該日誌分段的最近修改時間。
  • 找到相應日誌分段以後,使用二分法進行定位,與偏移量索引方式相似,找到不大於 1557554753430 最大索引項,也就是 [1557554753420 430]。
  • 拿着偏移量爲 430 到偏移量索引文件中使用二分法找到不大於 430 最大索引項,即 [20,320] 。
  • 日誌文件中從 320 的物理位置開始查找不小於 1557554753430 數據。
注意:timestamp文件中的 offset 與 index 文件中的 relativeOffset 不是一一對應的哦。由於數據的寫入是各自追加。

在偏移量索引文件中,索引數據都是順序記錄 offset ,但時間戳索引文件中每一個追加的索引時間戳必須大於以前追加的索引項,不然不予追加。在 Kafka 0.11.0.0 之後,消息信息中存在若干的時間戳信息。若是 broker 端參數 log.message.timestamp.type 設置爲 LogAppendTIme ,那麼時間戳一定能保持單調增加。反之若是是 CreateTime 則沒法保證順序。

日誌清理

日誌清理,不是日誌刪除哦,這仍是有所區別的,日誌刪除會在下文進行說明。

Kafka 提供兩種日誌清理策略:

日誌刪除:按照必定的刪除策略,將不知足條件的數據進行數據刪除

日誌壓縮:針對每一個消息的 Key 進行整合,對於有相同 Key 的不一樣 Value 值,只保留最後一個版本。

Kafka 提供 log.cleanup.policy 參數進行相應配置,默認值:delete,還能夠選擇 compact。

是否支持針對具體的 Topic 進行配置?

答案是確定的,主題級別的配置項是 cleanup.policy

日誌刪除

配置 默認值 說明
log.retention.check.interval.ms 300000 (5分鐘) 檢測頻率
log.retention.hours 168 (7天) 日誌保留時間小時
log.retention.minutes 日誌保留時間分鐘
log.retention.ms 日誌保留時間毫秒
file.delete.delay.ms 60000 (1分鐘) 延遲執行刪除時間
log.retention.bytes -1 無窮大 運行保留日誌文件最大值
log.retention.bytes 1073741824 (1G) 日誌文件最大值

Kafka 會週期性根據相應規則進行日誌數據刪除,保留策略有 3 種:基於時間的保留策略、基於日誌大小的保留策略和基於日誌其實偏移量的保留策略。

基於時間

日誌刪除任務會根據 log.retention.hours/log.retention.minutes/log.retention.ms 設定日誌保留的時間節點。若是超過該設定值,就須要進行刪除。默認是 7 天,log.retention.ms 優先級最高。

如何查找日誌分段文件中已通過去的數據呢?

Kafka 依據日誌分段中最大的時間戳進行定位,首先要查詢該日誌分段所對應的時間戳索引文件,查找時間戳索引文件中最後一條索引項,若最後一條索引項的時間戳字段值大於 0,則取該值,不然取最近修改時間。

爲何不直接選最近修改時間呢?

由於日誌文件能夠有意無心的被修改,並不能真實的反應日誌分段的最大時間信息。

刪除過程
  1. 從日誌對象中所維護日誌分段的跳躍表中移除待刪除的日誌分段,保證沒有線程對這些日誌分段進行讀取操做。
  2. 這些日誌分段全部文件添加 上 .delete 後綴。
  3. 交由一個以 "delete-file" 命名的延遲任務來刪除這些 .delete 爲後綴的文件。延遲執行時間能夠經過 file.delete.delay.ms 進行設置

若是活躍的日誌分段中也存在須要刪除的數據時?

Kafka 會先切分出一個新的日誌分段做爲活躍日誌分段,而後執行刪除操做。

基於日誌大小

日誌刪除任務會檢查當前日誌的大小是否超過設定值。設定項爲 log.retention.bytes ,單個日誌分段的大小由 log.regment.bytes 進行設定。

刪除過程

  1. 計算須要被刪除的日誌總大小 (當前日誌文件大小-retention值)。
  2. 從日誌文件第一個 LogSegment 開始查找可刪除的日誌分段的文件集合。
  3. 執行刪除。

基於日誌起始偏移量

基於日誌起始偏移量的保留策略的判斷依據是某日誌分段的下一個日誌分段的起始偏移量是否大於等於日誌文件的起始偏移量,如果,則能夠刪除此日誌分段。

注意:日誌文件的起始偏移量並不必定等於第一個日誌分段的基準偏移量,存在數據刪除,可能與之相等的那條數據已經被刪除了。

圖片描述

刪除過程
  • 從頭開始變了每個日誌分段,日誌分段 1 的下一個日誌分段的起始偏移量爲 11,小於 logStartOffset,將 日誌分段 1 加入到刪除隊列中
  • 日誌分段 2 的下一個日誌分段的起始偏移量爲 23,小於 logStartOffset,將 日誌分段 2 加入到刪除隊列中
  • 日誌分段 3 的下一個日誌分段的起始偏移量爲 30,大於 logStartOffset,則不進行刪除。
相關文章
相關標籤/搜索