Kafka技術內幕樣章:Kafka的日誌壓縮(LogCompaction)git
原文首發:http://zqhxuyuan.github.io/2016/05/13/2016-5-13-Kafka-Book-Sample-LogCompaction/github
分佈式存儲系統除了要保證客戶端寫請求流程的正確性,節點可能會非正常宕機或者須要重啓,在啓動的時候必需要可以正常地加載/恢復已有的數據,日誌管理類在建立的時候要加載已有的全部日誌文件,這和建立Log時要加載全部的Segment是相似的。LogManager
的logDirs
參數對應了log.dirs
配置項,每一個TopicPartition文件夾都對應一個Log實例,全部的Partition文件夾都在日誌目錄下,當成功加載完全部的Log實例後logs才能夠被日誌管理類真正地用在戰場上。 算法
假設logDirs=/tmp/kafka_logs1,/tmp/kafka_logs2
,logs1下有[t0-0,t0-1,t1-2],logs2下有[t0-2,t1-0,t1-1],圖3-26的logDir指的是Log對象的dir,和log.dirs是不一樣的概念,能夠認爲全部Log的dir都是在每一個log.dirs下,若是把Log.dir叫作Partition級別的文件夾,則checkpoint文件和Partition文件夾是同一層級。 數據庫
圖3-26 日誌的組織方式和對應的數據結構緩存
class LogManager(val logDirs: Array[File]){ val logs = new Pool[TopicAndPartition, Log]() val recoveryPointCheckpoints=logDirs.map((_,new OffsetCheckpoint(new File(_,"checkpoint")))) loadLogs() //啓動LogManager實例時,若是已經存在日誌文件,要把它們加載到內存中 private def loadLogs(): Unit = { val threadPools = mutable.ArrayBuffer.empty[ExecutorService] for (dir <- this.logDirs) { //按照log.dirs建立線程池,若是隻配置一個目錄就只有一個線程池 val pool = Executors.newFixedThreadPool(ioThreads) threadPools.append(pool) //checkpoint文件一個日誌目錄只有一個,並非每一個Partition級別! //既然全部Partition公共一個checkpoint文件,那麼文件內容固然要有Partition信息 var recoveryPoints:Map[TopicAndPartition,Long]=recoveryPointCheckpoints(dir).read val jobsForDir = for { dirContent <- Option(dir.listFiles).toList //日誌目錄下的全部文件/文件夾 logDir <- dirContent if logDir.isDirectory //Partition文件夾,忽略日誌目錄下的文件 } yield { CoreUtils.runnable { //每一個Partition文件夾建立一個線程,由線程池執行 val topicPartition = Log.parseTopicPartitionName(logDir) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) //分區的恢復點 val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) //恢復Log this.logs.put(topicPartition, current) //這裏放入logs集合中,全部分區的Log滿血復活 } } jobsForDir.map(pool.submit).toSeq //提交任務 } } //只有調用loadLogs後,logs纔有值,後面的操做都依賴於logs def allLogs(): Iterable[Log] = logs.values def logsByDir = logs.groupBy{case (_,log)=>log.dir.getParent} val cleaner: LogCleaner = new LogCleaner(cleanerConfig,logDirs,logs) def startup() { scheduler.schedule("log-retention", cleanupLogs) scheduler.schedule("log-flusher", flushDirtyLogs) scheduler.schedule("recovery-point-checkpoint",checkpointRecoveryPointOffsets) if(cleanerConfig.enableCleaner) cleaner.startup() } }
LogManager.startup()啓動後會在後臺運行多個定時任務和線程,表3-7列舉了這些線程的方法和用途,這些線程最後都會操做Log實例(幸虧咱們已經成功地加載了logs),畢竟LogManager從名字來看就是要對Log進行管理(把checkpoint也看作是日誌文件的一部分,由於它伴隨着日誌而生,因此也在LogManager的管理範疇內)。數據結構
線程/任務 | 方法 | 做用 |
---|---|---|
日誌保留任務(log retention) | cleanupLogs | 刪除失效的Segment或者爲了控制日誌文件大小要刪除一些文件 |
日誌刷寫任務(log flusher) | flushDirtyLogs | 根據時間策略,將還在操做系統緩存層的文件刷寫到磁盤上 |
檢查點刷寫任務(checkpoint) | checkpointRecoveryPointOffsets | 定時地將checkpoint恢復點狀態寫到文件中 |
日誌清理線程(cleaner) | cleaner.startup() | 日誌壓縮,針對帶有key的消息的清理策略 |
表3-7 日誌管理類的後臺線程app
日誌文件和checkpoint的刷寫flush
都只是將當前最新的數據寫到磁盤上。checkpoint檢查點也叫作恢復點(顧名思義是從指定的點開始恢復數據),log.dirs的每一個目錄下只有一個全部Partition共享的全局checkpoint文件。異步
//日誌文件刷寫任務 private def flushDirtyLogs() = { for ((topicAndPartition, log) <- logs) { val timeSinceLastFlush = time.milliseconds - log.lastFlushTime if(timeSinceLastFlush >= log.config.flushMs) log.flush } } //checkpoint文件刷寫任務 def checkpointRecoveryPointOffsets() { this.logDirs.foreach(checkpointLogsInDir) } private def checkpointLogsInDir(dir: File): Unit = { val recoveryPoints = this.logsByDir.get(dir.toString) //checkpoint是log.dirs目錄級別 //logsByDir對於每一個dir都有多個Partition對應的Log,因此mapValues對每一個Log獲取recoveryPoint recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) } //只有flush的時候纔會更新恢復點,不過flush並非每次寫都會發生的 def flush(offset: Long) : Unit = { if (offset <= this.recoveryPoint) return for(segment<-logSegments(this.recoveryPoint,offset)) //選擇恢復點和當前之間的Segment segment.flush() //會分別刷寫log數據文件和index索引文件(調用底層的fsync) if(offset > this.recoveryPoint) { this.recoveryPoint = offset //recoveryPoint其實是offset lastflushedTime.set(time.milliseconds) } }
爲何全部Partition共用一個checkpoint文件,而不是每一個Partition都有本身的checkpoint文件,由於checkpoint數據量不是很大,那麼爲何前面分析的索引文件則是以Partition級別,甚至每一個Segment都有對應的數據文件和索引文件,索引自己也是offset,它和checkpoint數據量也都是不大的啊,那麼是否是也能夠每一個Partition只有一個索引文件,而不是每一個Segment一個索引文件,實際上索引文件的用途是爲了更快地查詢,該省的地方仍是要節約資源(全部Partition只有一個checkpoint文件),不應節省的仍是要大方點(每一個Segment一個索引文件),作人未嘗不是這個道理。async
清理日誌其實是清理過時的Segment,或者日誌文件太大了須要刪除最舊的數據,使得總體的日誌文件大小不超過指定的值。舉例用隊列來緩存全部的請求任務,每一個任務都有必定的存活時間,超過期間後任務就應該自動被刪除掉,同時隊列也有一個上限,不能無限制地添加任務,若是超過指定大小時,就要把最舊的任務刪除掉,以維持隊列的固定大小,這樣能夠保證隊列不至於無限大致使系統資源被耗盡。分佈式
//日誌清理任務 def cleanupLogs() { for(log <- allLogs; if !log.config.compact) cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } private def cleanupExpiredSegments(log: Log): Int = { log.deleteOldSegments(time.milliseconds-_.lastModified>log.config.retentionMs) } private def cleanupSegmentsToMaintainSize(log: Log): Int = { var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) { diff -= segment.size true } else false } log.deleteOldSegments(shouldDelete) }
Log的deleteOldSegments方法接收一個高階函數,參數是Segment,返回布爾類型表示這個Segment是否須要被刪除,在LogManager中調用的地方並無傳遞Segment,而是在Log中獲取每一個Segment。這是由於LogManager沒法跨過Log直接和Segment通訊,LogManager沒法直接管理Segment,Segment只屬於Log,只能由Log管理。
def deleteOldSegments(predicate: LogSegment => Boolean): Int = { //logSegments是Log的全部Segment,s是每一個Segment val deletable = logSegments.takeWhile(s => predicate(s)) if(segments.size == numToDelete) roll() deletable.foreach(deleteSegment(_)) } private def deleteSegment(segment: LogSegment) { segments.remove(segment.baseOffset) //刪除數據結構 asyncDeleteSegment(segment) //異步刪除Segment } private def asyncDeleteSegment(segment: LogSegment) { segment.changeFileSuffixes("", Log.DeletedFileSuffix) def deleteSeg() = segment.delete() //和flush同樣最後調用log和index.delete scheduler.schedule("delete-file", deleteSeg) }
清理日誌有兩種策略,一種是上面的cleanupLogs根據時間或大小策略(粗粒度),還有一種是針對每一個key的日誌刪除策略(細粒度)即LogCleaner方式,若是消息沒有key,那隻能採用第一種清理策略了。刪除策略是以topic爲級別的,因此不一樣的topic能夠設置不一樣的刪除策略,因此一個集羣中可能存在有些topic按照粗粒度模式刪除,有些則按照細粒度模式刪除,徹底取決於你的業務需求(固然要不要給消息設置key是一個關鍵決策)。
不論是傳統的RDBMS仍是分佈式的NoSQL存儲在數據庫中的數據老是會更新的,相同key的新記錄更新數據的方式簡單來講有兩種:直接更新(找到數據庫中的已有位置以最新的值替換舊的值),或者以追加的方式(保留舊的值,查詢時再合併,或者也有一個後臺線程會按期合併)。採用追加記錄的作法在節點崩潰時能夠用於恢復數據,還有一個好處是寫性能很高,由於這樣在寫的時候就不須要查詢操做,這也是表3-8中不少和存儲相關的分佈式系統都採用這種方式的緣由,它的代價就是須要有Compaction操做來保證相同key的多條記錄須要合併。
分佈式系統 | 更新數據追加到哪裏 | 數據文件 | 是否須要Compaction |
---|---|---|---|
ZooKeeper | log | snapshot | 不須要,由於數據量不大 |
Redis | aof | rdb | 不須要,由於是內存數據庫 |
Cassandra | commit log | data.db | 須要,數據存在本地文件 |
HBase | commit log | HFile | 須要,數據存在HDFS |
Kafka | commit log | commit log | 須要,數據存在Partition的多個Segment裏 |
表3-8 分佈式系統的更新操做用commit log保存
Kafka中若是消息有key,相同key的消息在不一樣時刻有不一樣的值,則只容許存在最新的一條消息,這就比如傳統數據庫的update操做,查詢結果必定是最近update的那一條,而不該該查詢出多條或者查詢出舊的記錄,固然對於HBase/Cassandra這種支持多版本的數據庫而言,update操做可能致使添加新的列,查詢時是合併的結果而不必定就是最新的記錄。圖3-27中示例了多條消息,一旦key已經存在,相同key的舊的消息會被刪除,新的被保留。
圖3-27 更新操做要刪除舊的消息
Kafka的更新操做也採用追加(commit log就是追加)也須要有Compaction操做,固然它並非像上面那樣一條消息一條消息地比較,一般Compaction是對多個文件作一次總體的壓縮,圖3-28是Log的壓縮操做先後示例,壓縮確保了相同key只存在一個最新的value,舊的value在壓縮過程會被刪除掉。
圖3-28 LogCompaction的過程
每一個Partition的(Leader Replica的)Log有多個Segment文件,爲了避免影響正在寫的最近的那個activeSegment,日誌壓縮不該該清理activeSegment,而是清理剩下的全部Segment。清理Segment時也不是一個個Segment慢吞吞地清理,也不是一次性全部Segment想要所有清理,而是幾個Segment分紅一組,分批清理。清理線程會佔用必定的CPU,由於要讀取已有的Segment並壓縮成新的Segment,爲了避免影響其餘組件(主要是讀,由於讀操做會讀取舊的Segment,而寫不會被影響由於寫操做只往activeSegment寫,而activeSegment不會被清理),能夠設置清理線程的線程個數,同時Kakfa還支持Throttler限速(讀取舊的Segment時和寫入新的Segment均可以限速)。固然也並非每一個Partition在同一時間都進行清理,而是選擇其中最須要被清理的Partition。
清理/壓縮指的是刪除舊的更新操做,只保留最近的一個更新操做,清理方式有多種,好比JVM中的垃圾回收算法將存活的對象拷貝/整理到指定的區域,HBase/Cassandra的Compaction會將多個數據文件合併/整理成新的數據文件。Kafka的LogCleaner清理Log時會將全部的Segment在CleanerPoint清理點位置分紅Tail和Head兩部分,圖3-29中每條消息所在的Segment並無畫出來(這些消息可能在不一樣的Segment裏),由於清理是以Partition爲級別,就淡化了Segment的邊界問題,不過具體的清理動做仍是要面向Segment,由於複製消息時不得不面對Segment文件。
圖3-29 Log包括Tail和Head兩部分
清理後Log Head部分每條消息的offset都是逐漸遞增的,而Tail部分消息的offset是斷斷續續的。LogToClean
表示須要被清理的日誌,其中firstDirtyOffset會做爲Tail和Head的分界點,圖3-20中舉例了在一個Log的分界點發生Compaction的步驟。
圖3-30 日誌分紅Tail和Head的消息壓縮步驟
每一個Partition的Log都對應一個LogToClean對象,在選擇哪一個Partition須要優先作Compaction操做時是依據cleanableRatio的比率即Head部分大小(dirtyBytes)除於總大小中最大的,假設日誌文件同樣大,firstDirtyOffset越小,dirtyBytes就越大。而firstDirtyOffset每次Compaction後都會增長,因此實際上選擇算法是優先選擇尚未發生或者發生次數比較少的Partition,由於這樣的Partition的firstDirtyOffset沒有機會增長太多。
case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] { val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum val cleanableRatio = dirtyBytes / totalBytes.toDouble def totalBytes = cleanBytes + dirtyBytes override def compare(th:LogToClean)=math.signum(this.cleanableRatio-th.cleanableRatio) }
不只僅是更新須要清理舊的數據,刪除操做也須要清理,生產者客戶端若是發送的消息key的value是空的,表示要刪除這條消息,發生在刪除標記以前的記錄都須要刪除掉,而發生在刪除標記以後的記錄則不會被刪除。日誌壓縮保證了:
任何消費者若是可以遇上Log的Head部分,它就會看到寫入的每條消息,這些消息都是順序遞增(中間不會間斷)的offset
老是維持消息的有序性,壓縮並不會對消息進行從新排序,而是移除一些消息
每條消息的offset永遠不會被改變,它是日誌文件標識位置的永久編號
讀取/消費時若是從最開始的offset=0開始,那麼至少能夠看到全部記錄按照它們寫入的順序獲得的最終狀態(狀態指的是value,相同key不一樣value,最終的狀態以最新的value爲準):由於這種場景下寫入順序和讀取順序是一致的,寫入時和讀取時offset都是不斷遞增。舉例寫入key1的value在offset=1和offst=5的值分別是v1和v2,那麼讀取到offset=1時,最終的狀態(value值)是v1,讀取到offset=5時,最終狀態是v2(不能期望說讀取到offset=1時就要求狀態是v2)