最近查看Kafka文檔, 發現 Kafka 有個 Log Compaction 功能是咱們以前沒有留意到的, 可是有着很高的潛在實用價值.git
Kafka 中的每一條數據都有一對 Key 和 Value, 數據存放在磁盤上, 通常不會被永久保留, 而是在到達必定的量或者時間後對最先寫入的數據進行刪除. Log Compaction 在默認的刪除規則以外提供了另外一種刪除過期數據(或者說保留有價值的數據)的方式, 就是對於有相同 Key 的不一樣數據, 只保留最後一條, 前面的數據在合適的狀況下刪除.github
Log Compaction 特性, 就實時計算而言, 能夠在災難恢復方面有很好地應用場景. 好比說咱們在 Storm 裏作計算時, 須要長期在內存裏維護一些數據, 這些數據多是經過聚合了一天或者一週的日誌獲得的, 這些數據一旦因爲偶然的緣由(磁盤,網絡等)崩潰了, 從頭開始計算須要漫長的時間.一個可行的應對方法是定時將內存裏的數據備份到外部存儲中, 好比 Redis 或者 Mysql 等, 當崩潰發生的時候再從外部存儲讀回來繼續計算.算法
使用 Log Compaction 來代替這些外部存儲有如下好處.sql
Kafka 既是數據源又是存儲工具, 能夠簡化技術棧, 下降維護成本.apache
使用 Mysql 或者 Redis 做爲外部存儲的話, 須要將存儲的 Key 記錄下來, 恢復時再用這些 Key 將數據取回, 實現起來有必定的工程複雜度. 用Log Compaction 特性的話只要把數據一古腦兒地寫進 Kafka, 等災難恢復的時候再讀回內存就好了.網絡
Kafka 針對磁盤讀寫都有很高的順序性, 相對於 Mysql 沒有索引查詢等工做量的負擔, 能夠實現高性能, 相對於 Redis 而言, 它能夠充分利用廉價的磁盤而對內存要求很低, 在接近的性能下能實現很是高的性價比(僅僅針對災難恢復這個場景而言).ide
當 topic 的 cleanup.policy (默認爲delete) 設置爲 compact 時, Kafka 的後臺線程會定時把 topic 遍歷兩次, 第一次把每一個 key 的哈希值最後一次出現的 offset 都存下來, 第二次檢查每一個 offset 對應的 key 是否在更後面的日誌中出現過,若是出現了就刪除對應的日誌.工具
Log Compaction 的大部分功能由CleanerThread完成, 核心邏輯在 Cleaner 的 clean方法性能
/** * Clean the given log * * @param cleanable The log to be cleaned * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { val stats = new CleanerStats() info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) // <----- 這裏第一次遍歷全部offset將key索引 val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - log.config.deleteRetentionMs } // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) // <-- 這裏第二次遍歷全部offset,刪除冗餘的日誌,而且將多個小的segment合併爲一個 // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }
log compaction 經過兩次遍歷全部數據來實現, 兩次遍歷之間交流的媒介就是一個
OffsetMap, 下面是OffsetMap的簽名ui
trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long) def get(key: ByteBuffer): Long def clear() def size: Int def utilization: Double = size.toDouble / slots def latestOffset: Long }
這基本就是個普通的mutable map, 在 Kafka 項目中,它的實現只有一個, 叫作SkimpyOffsetMap
put 方法會爲每一個 key 生成一份摘要,默認使用 md5 方法生成一個 16byte 的摘要, 根據這個摘要在 bytes
中哈希的到一個下標, 若是這個下標已經被別的摘要佔據, 則線性查找到下個空餘的下標爲止, 而後在對應位置插入該 key 對應的 offset
/** * Associate this offset to the given key. * @param key The key * @param offset The offset */ override def put(key: ByteBuffer, offset: Long) { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) lastOffset = offset return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) lastOffset = offset entries += 1 }
get 方法使用和 put 一樣的摘要算法得到 key 的摘要, 經過摘要得到 offset 的存儲位置
/** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode val maxAttempts = slots + hashSize - 4 do { if(attempt >= maxAttempts) return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() }
默認狀況下, Kafka 用 16 個 byte 存放key的摘要, 用 8 個 byte 存放摘要對應的 offset, 1GB 的空間能夠保存 1024* 1024*1024 / 24 = 44,739,242.666...
個 key 對應的數據.
這個 log compaction 的原理挺簡單, 就是按期把全部日誌讀兩遍,寫一遍, cpu 的速度超過磁盤徹底不是問題, 只要日誌的量對應的讀兩遍寫一遍的時間在可接受的範圍內, 它的性能就是能夠接受的.
如今的 OffsetMap 惟一的實現名字叫作 SkimpyOffsetMap, 相信大家已經從這個名字裏看出端倪, 最初的做者自己也認爲這樣的實現不夠嚴謹. 這個算法在兩個 key 的 md5 值相同的狀況下就判斷 key 是相同的, 若是遇到了 key 不一樣而 md5 值相同的狀況, 那兩個 key 中其中一個的消息就丟失了. 雖然 md5 值相同的機率很低, 但若是真的碰上了, 那就是100%, 機率值再低也沒用, 並且從網上的反映看彷佛衝突還很多見.
我我的目前想到的處理方案是, 大部分的 key 總長度並不算長, 能夠把這類 key 全部可能的狀況都md5一遍看一下是否有衝突, 若是沒有的話就放心用.