Kafka技術內幕-日誌壓縮

Kafka技術內幕樣章:Kafka的日誌壓縮(LogCompaction)git

原文首發:http://zqhxuyuan.github.io/2016/05/13/2016-5-13-Kafka-Book-Sample-LogCompaction/github

3.3 日誌管理類的後臺線程

分佈式存儲系統除了要保證客戶端寫請求流程的正確性,節點可能會非正常宕機或者須要重啓,在啓動的時候必需要可以正常地加載/恢復已有的數據,日誌管理類在建立的時候要加載已有的全部日誌文件,這和建立Log時要加載全部的Segment是相似的。LogManagerlogDirs參數對應了log.dirs配置項,每一個TopicPartition文件夾都對應一個Log實例,全部的Partition文件夾都在日誌目錄下,當成功加載完全部的Log實例後logs才能夠被日誌管理類真正地用在戰場上。 算法

假設logDirs=/tmp/kafka_logs1,/tmp/kafka_logs2,logs1下有[t0-0,t0-1,t1-2],logs2下有[t0-2,t1-0,t1-1],圖3-26的logDir指的是Log對象的dir,和log.dirs是不一樣的概念,能夠認爲全部Log的dir都是在每一個log.dirs下,若是把Log.dir叫作Partition級別的文件夾,則checkpoint文件和Partition文件夾是同一層級。 數據庫

3-26 logDirs and logs
圖3-26 日誌的組織方式和對應的數據結構緩存

class LogManager(val logDirs: Array[File]){
  val logs = new Pool[TopicAndPartition, Log]()
  val recoveryPointCheckpoints=logDirs.map((_,new OffsetCheckpoint(new File(_,"checkpoint"))))

  loadLogs() //啓動LogManager實例時,若是已經存在日誌文件,要把它們加載到內存中

  private def loadLogs(): Unit = {
    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
    for (dir <- this.logDirs) { //按照log.dirs建立線程池,若是隻配置一個目錄就只有一個線程池
      val pool = Executors.newFixedThreadPool(ioThreads)
      threadPools.append(pool)
      //checkpoint文件一個日誌目錄只有一個,並非每一個Partition級別!
      //既然全部Partition公共一個checkpoint文件,那麼文件內容固然要有Partition信息
      var recoveryPoints:Map[TopicAndPartition,Long]=recoveryPointCheckpoints(dir).read
      val jobsForDir = for {
        dirContent <- Option(dir.listFiles).toList //日誌目錄下的全部文件/文件夾
        logDir <- dirContent if logDir.isDirectory //Partition文件夾,忽略日誌目錄下的文件
      } yield {
        CoreUtils.runnable { //每一個Partition文件夾建立一個線程,由線程池執行
          val topicPartition = Log.parseTopicPartitionName(logDir)
          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) //分區的恢復點
          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) //恢復Log
          this.logs.put(topicPartition, current) //這裏放入logs集合中,全部分區的Log滿血復活
        }
      }
      jobsForDir.map(pool.submit).toSeq //提交任務
    }
  }

  //只有調用loadLogs後,logs纔有值,後面的操做都依賴於logs
  def allLogs(): Iterable[Log] = logs.values
  def logsByDir = logs.groupBy{case (_,log)=>log.dir.getParent}
  val cleaner: LogCleaner = new LogCleaner(cleanerConfig,logDirs,logs)

  def startup() {
    scheduler.schedule("log-retention", cleanupLogs)
    scheduler.schedule("log-flusher", flushDirtyLogs)
    scheduler.schedule("recovery-point-checkpoint",checkpointRecoveryPointOffsets)
    if(cleanerConfig.enableCleaner) cleaner.startup()
  }
}

LogManager.startup()啓動後會在後臺運行多個定時任務和線程,表3-7列舉了這些線程的方法和用途,這些線程最後都會操做Log實例(幸虧咱們已經成功地加載了logs),畢竟LogManager從名字來看就是要對Log進行管理(把checkpoint也看作是日誌文件的一部分,由於它伴隨着日誌而生,因此也在LogManager的管理範疇內)。數據結構

線程/任務 方法 做用
日誌保留任務(log retention) cleanupLogs 刪除失效的Segment或者爲了控制日誌文件大小要刪除一些文件
日誌刷寫任務(log flusher) flushDirtyLogs 根據時間策略,將還在操做系統緩存層的文件刷寫到磁盤上
檢查點刷寫任務(checkpoint) checkpointRecoveryPointOffsets 定時地將checkpoint恢復點狀態寫到文件中
日誌清理線程(cleaner) cleaner.startup() 日誌壓縮,針對帶有key的消息的清理策略

表3-7 日誌管理類的後臺線程app

日誌文件和checkpoint的刷寫flush都只是將當前最新的數據寫到磁盤上。checkpoint檢查點也叫作恢復點(顧名思義是從指定的點開始恢復數據),log.dirs的每一個目錄下只有一個全部Partition共享的全局checkpoint文件。異步

//日誌文件刷寫任務
private def flushDirtyLogs() = {
  for ((topicAndPartition, log) <- logs) {
    val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
    if(timeSinceLastFlush >= log.config.flushMs) log.flush
  }
}
//checkpoint文件刷寫任務
def checkpointRecoveryPointOffsets() {
  this.logDirs.foreach(checkpointLogsInDir)
}
private def checkpointLogsInDir(dir: File): Unit = {
  val recoveryPoints = this.logsByDir.get(dir.toString) //checkpoint是log.dirs目錄級別
  //logsByDir對於每一個dir都有多個Partition對應的Log,因此mapValues對每一個Log獲取recoveryPoint
  recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}

//只有flush的時候纔會更新恢復點,不過flush並非每次寫都會發生的
def flush(offset: Long) : Unit = {
  if (offset <= this.recoveryPoint) return
  for(segment<-logSegments(this.recoveryPoint,offset)) //選擇恢復點和當前之間的Segment
    segment.flush() //會分別刷寫log數據文件和index索引文件(調用底層的fsync)
  if(offset > this.recoveryPoint) {
    this.recoveryPoint = offset //recoveryPoint其實是offset
    lastflushedTime.set(time.milliseconds)
  }
}

爲何全部Partition共用一個checkpoint文件,而不是每一個Partition都有本身的checkpoint文件,由於checkpoint數據量不是很大,那麼爲何前面分析的索引文件則是以Partition級別,甚至每一個Segment都有對應的數據文件和索引文件,索引自己也是offset,它和checkpoint數據量也都是不大的啊,那麼是否是也能夠每一個Partition只有一個索引文件,而不是每一個Segment一個索引文件,實際上索引文件的用途是爲了更快地查詢,該省的地方仍是要節約資源(全部Partition只有一個checkpoint文件),不應節省的仍是要大方點(每一個Segment一個索引文件),作人未嘗不是這個道理。async

3.3.1 日誌清理

清理日誌其實是清理過時的Segment,或者日誌文件太大了須要刪除最舊的數據,使得總體的日誌文件大小不超過指定的值。舉例用隊列來緩存全部的請求任務,每一個任務都有必定的存活時間,超過期間後任務就應該自動被刪除掉,同時隊列也有一個上限,不能無限制地添加任務,若是超過指定大小時,就要把最舊的任務刪除掉,以維持隊列的固定大小,這樣能夠保證隊列不至於無限大致使系統資源被耗盡。分佈式

//日誌清理任務
def cleanupLogs() {
  for(log <- allLogs; if !log.config.compact)
    cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
private def cleanupExpiredSegments(log: Log): Int = {
  log.deleteOldSegments(time.milliseconds-_.lastModified>log.config.retentionMs)
}
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
  var diff = log.size - log.config.retentionSize
  def shouldDelete(segment: LogSegment) = {
    if(diff - segment.size >= 0) {
      diff -= segment.size
      true
    } else false
  }
  log.deleteOldSegments(shouldDelete)
}

Log的deleteOldSegments方法接收一個高階函數,參數是Segment,返回布爾類型表示這個Segment是否須要被刪除,在LogManager中調用的地方並無傳遞Segment,而是在Log中獲取每一個Segment。這是由於LogManager沒法跨過Log直接和Segment通訊,LogManager沒法直接管理Segment,Segment只屬於Log,只能由Log管理。

def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
  //logSegments是Log的全部Segment,s是每一個Segment
  val deletable = logSegments.takeWhile(s => predicate(s))
  if(segments.size == numToDelete) roll()
  deletable.foreach(deleteSegment(_))
}
private def deleteSegment(segment: LogSegment) {
  segments.remove(segment.baseOffset) //刪除數據結構
  asyncDeleteSegment(segment) //異步刪除Segment
}
private def asyncDeleteSegment(segment: LogSegment) {
  segment.changeFileSuffixes("", Log.DeletedFileSuffix)
  def deleteSeg() = segment.delete() //和flush同樣最後調用log和index.delete
  scheduler.schedule("delete-file", deleteSeg)
}

清理日誌有兩種策略,一種是上面的cleanupLogs根據時間或大小策略(粗粒度),還有一種是針對每一個key的日誌刪除策略(細粒度)即LogCleaner方式,若是消息沒有key,那隻能採用第一種清理策略了。刪除策略是以topic爲級別的,因此不一樣的topic能夠設置不一樣的刪除策略,因此一個集羣中可能存在有些topic按照粗粒度模式刪除,有些則按照細粒度模式刪除,徹底取決於你的業務需求(固然要不要給消息設置key是一個關鍵決策)。

3.3.2 日誌壓縮

不論是傳統的RDBMS仍是分佈式的NoSQL存儲在數據庫中的數據老是會更新的,相同key的新記錄更新數據的方式簡單來講有兩種:直接更新(找到數據庫中的已有位置以最新的值替換舊的值),或者以追加的方式(保留舊的值,查詢時再合併,或者也有一個後臺線程會按期合併)。採用追加記錄的作法在節點崩潰時能夠用於恢復數據,還有一個好處是寫性能很高,由於這樣在寫的時候就不須要查詢操做,這也是表3-8中不少和存儲相關的分佈式系統都採用這種方式的緣由,它的代價就是須要有Compaction操做來保證相同key的多條記錄須要合併。

分佈式系統 更新數據追加到哪裏 數據文件 是否須要Compaction
ZooKeeper log snapshot 不須要,由於數據量不大
Redis aof rdb 不須要,由於是內存數據庫
Cassandra commit log data.db 須要,數據存在本地文件
HBase commit log HFile 須要,數據存在HDFS
Kafka commit log commit log 須要,數據存在Partition的多個Segment裏

表3-8 分佈式系統的更新操做用commit log保存

Kafka中若是消息有key,相同key的消息在不一樣時刻有不一樣的值,則只容許存在最新的一條消息,這就比如傳統數據庫的update操做,查詢結果必定是最近update的那一條,而不該該查詢出多條或者查詢出舊的記錄,固然對於HBase/Cassandra這種支持多版本的數據庫而言,update操做可能致使添加新的列,查詢時是合併的結果而不必定就是最新的記錄。圖3-27中示例了多條消息,一旦key已經存在,相同key的舊的消息會被刪除,新的被保留。

3-27 update operation
圖3-27 更新操做要刪除舊的消息

Kafka的更新操做也採用追加(commit log就是追加)也須要有Compaction操做,固然它並非像上面那樣一條消息一條消息地比較,一般Compaction是對多個文件作一次總體的壓縮,圖3-28是Log的壓縮操做先後示例,壓縮確保了相同key只存在一個最新的value,舊的value在壓縮過程會被刪除掉。

3-28 log compaction
圖3-28 LogCompaction的過程

每一個Partition的(Leader Replica的)Log有多個Segment文件,爲了避免影響正在寫的最近的那個activeSegment,日誌壓縮不該該清理activeSegment,而是清理剩下的全部Segment。清理Segment時也不是一個個Segment慢吞吞地清理,也不是一次性全部Segment想要所有清理,而是幾個Segment分紅一組,分批清理。清理線程會佔用必定的CPU,由於要讀取已有的Segment並壓縮成新的Segment,爲了避免影響其餘組件(主要是讀,由於讀操做會讀取舊的Segment,而寫不會被影響由於寫操做只往activeSegment寫,而activeSegment不會被清理),能夠設置清理線程的線程個數,同時Kakfa還支持Throttler限速(讀取舊的Segment時和寫入新的Segment均可以限速)。固然也並非每一個Partition在同一時間都進行清理,而是選擇其中最須要被清理的Partition。

清理/壓縮指的是刪除舊的更新操做,只保留最近的一個更新操做,清理方式有多種,好比JVM中的垃圾回收算法將存活的對象拷貝/整理到指定的區域,HBase/Cassandra的Compaction會將多個數據文件合併/整理成新的數據文件。Kafka的LogCleaner清理Log時會將全部的Segment在CleanerPoint清理點位置分紅Tail和Head兩部分,圖3-29中每條消息所在的Segment並無畫出來(這些消息可能在不一樣的Segment裏),由於清理是以Partition爲級別,就淡化了Segment的邊界問題,不過具體的清理動做仍是要面向Segment,由於複製消息時不得不面對Segment文件。

3-29 log tail head
圖3-29 Log包括Tail和Head兩部分

清理後Log Head部分每條消息的offset都是逐漸遞增的,而Tail部分消息的offset是斷斷續續的。LogToClean表示須要被清理的日誌,其中firstDirtyOffset會做爲Tail和Head的分界點,圖3-20中舉例了在一個Log的分界點發生Compaction的步驟。

3-30 log compaction example
圖3-30 日誌分紅Tail和Head的消息壓縮步驟

每一個Partition的Log都對應一個LogToClean對象,在選擇哪一個Partition須要優先作Compaction操做時是依據cleanableRatio的比率即Head部分大小(dirtyBytes)除於總大小中最大的,假設日誌文件同樣大,firstDirtyOffset越小,dirtyBytes就越大。而firstDirtyOffset每次Compaction後都會增長,因此實際上選擇算法是優先選擇尚未發生或者發生次數比較少的Partition,由於這樣的Partition的firstDirtyOffset沒有機會增長太多。

case class LogToClean(topicPartition: TopicAndPartition, log: Log, 
    firstDirtyOffset: Long) extends Ordered[LogToClean] {
  val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
  val dirtyBytes = log.logSegments(firstDirtyOffset, 
    math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
  val cleanableRatio = dirtyBytes / totalBytes.toDouble
  def totalBytes = cleanBytes + dirtyBytes
  override def compare(th:LogToClean)=math.signum(this.cleanableRatio-th.cleanableRatio)
}

不只僅是更新須要清理舊的數據,刪除操做也須要清理,生產者客戶端若是發送的消息key的value是空的,表示要刪除這條消息,發生在刪除標記以前的記錄都須要刪除掉,而發生在刪除標記以後的記錄則不會被刪除。日誌壓縮保證了:

  1. 任何消費者若是可以遇上Log的Head部分,它就會看到寫入的每條消息,這些消息都是順序遞增(中間不會間斷)的offset

  2. 老是維持消息的有序性,壓縮並不會對消息進行從新排序,而是移除一些消息

  3. 每條消息的offset永遠不會被改變,它是日誌文件標識位置的永久編號

  4. 讀取/消費時若是從最開始的offset=0開始,那麼至少能夠看到全部記錄按照它們寫入的順序獲得的最終狀態(狀態指的是value,相同key不一樣value,最終的狀態以最新的value爲準):由於這種場景下寫入順序和讀取順序是一致的,寫入時和讀取時offset都是不斷遞增。舉例寫入key1的value在offset=1和offst=5的值分別是v1和v2,那麼讀取到offset=1時,最終的狀態(value值)是v1,讀取到offset=5時,最終狀態是v2(不能期望說讀取到offset=1時就要求狀態是v2)

相關文章
相關標籤/搜索