這個格式好點:https://github.com/haogrgr/haogrgr-test/blob/master/logs/kafka_source.txtgit
5.日誌管理器啓動 logManager.startup(){
主要是啓動一些定時任務:
a)LogManager.cleanupLogs, 根據配置清理日誌文件(cleanupExpiredSegments[根據時間], cleanupSegmentsToMaintainSize[根據大小]).
Log.deleteOldSegments {
按時間和按大小邏輯相似, 按時間則是根據每一個Segment的最後修改時間判斷; 按大小, 則是依次累加每一個LogSegment的大小, 當累計大小大於配置大小後的全部Segment都刪除.
注意, 這裏不刪除當前活動的Segment
獲取到要刪除的Segment後, 依次調用kafka.log.Log.deleteSegment方法{
首先將Segment從Log.segments中刪除,
而後重命名Segment的index和log文件爲.deleted後綴
而後建立異步任務, 異步的刪除Segment文件
}
}
b)LogManager.flushDirtyLogs, 根據配置定時刷盤, 刷盤後, 會更新恢復點(log.recoveryPoint).
LogManager.flushDirtyLogs{
遍歷logs
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
if(timeSinceLastFlush >= log.config.flushMs){
log.flush
}
}
Log.flush{
經過treemap找到最新offset和上一次recoveryPoint(恢復點)的Segment, 依次調用Segment.flush方法, 內部調用index和logFile的force方法刷盤
而後記錄最新的recoveryPoint(恢復點), 更新lastflushedTime
}
c)LogManager.checkpointRecoveryPointOffsets, 根據配置, 將每一個Log.recoveryPoint刷盤, 建立checkpointRecoveryPointOffset文件, 用途前面2.4.1有說.
而後, 啓動cleaner線程
cleaner.startup() {
大概做用就是, 清理日誌, 具體方法是, 對同一個key的消息, 保留offset最大的消息體, 其餘的丟棄, 對於空消息體, 則表示刪除.
能夠看到, 適合某些場景, 好比說用kafka來同步某個屬性的值, 每變更一次, 就發一次消息, 消費者更新爲最新值, 這種狀況, 新的客戶端對於某一key只須要獲取最新的一條消息的值就好了.
有點相似於redis的aof rewrite.
而後有時候清理後, 日誌文件變得很小, 就須要合併多個日誌文件.
由於清理過程須要大量的IO操做, 因此經過Throttler類來對限速, 防止對正常的寫日誌產生影響
由於清理是一個長時間的過程, 且是多線程的, 因此須要有個地方標記這個分區正在清理, 因此就有了LogCleanerManager, 來存放狀態信息
內容比較多, 具體見5.1
}
}github
5.1.Cleaner線程 {
根據配置, 建立LogCleaner, 若是cleanerConfig.enableCleaner == true
new LogCleaner(cleanerConfig, logDirs, logs, time = time) {
建立清理管理器, 主要負責維護分區在clean過程當中的狀態信息
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs);
限流器, 防止clean佔用的IO可控, 線程安全, 多個清理線程共用一個.
private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, checkIntervalMs = 300, throttleDown = true, "cleaner-io", "bytes", time = time)
//清理線程
val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
}
先看和業務關係不大的Throttler{
累加計數器, 而後判斷時間是否是該檢查速度了,
若是要檢查速度, 就算出當前的速度, 和配置的數度對比, 看看是否是須要調整
若是須要調整, 具體算法思路爲下:
具體問題也抽象爲這樣的問題, 兩我的以不一樣的速度走了一秒鐘, 求, 一秒鐘後, 快的人停多久, 才能等到慢的人.
具體解法就是, 根據速度和時間, 算出距離差距, 而後經過 (慢人的速度 / 距離差距 = 慢的人要趕的時間 = 快的人要等的時間)
這裏kafka的實現相似, 不過省略了中間過程, 因此比較難看懂, 具體能夠當作下面代碼
if(needAdjustment) {
val desiredRateMs = desiredRatePerSec / Time.MsPerSec.toDouble //限制的速度(慢人速度)
val elapsedMs = elapsedNs.toDouble / Time.NsPerMs //已經跑了多久時間
val diffBytes = observedSoFar - (desiredRateMs * elapsedMs) //快人跑的距離 - (慢人速度 * 已經跑了多久時間 = 慢人跑的距離) = 差距
val newSleepTime = round(diffBytes / desiredRateMs) //差距 / 慢人速度 = 要等待的時間
if(newSleepTime > 0) {
time.sleep(newSleepTime)
}
}
把中間過程化簡後, 就是kafka的寫法
}
在來看看維護清理狀態的 new LogCleanerManager(logDirs, logs) {
用來保存清理進度快照, 保存的是每一個分區上次清理到的位置(firstDirtyOffset), firstDirtyOffset以前的表示已經清理過了, 每次清理完成會更新快照文件.
寫:當clean完成的時候, 會調用LogCleanerManager.doneCleaning來更新分區最新的offset快照
讀:Clean線程中, 會不斷經過LogCleanerManager.grabFilthiestLog()來獲取髒日誌, 而判斷髒日誌, 是經過獲取到快照中記錄的firstDirtyOffset, 從而計算出髒日誌大小, 來判斷是否須要清理.
清理須要策略, 若是不記錄清理進度, 則每次都作所有的掃描, 低效, 因此經過保存clean進度快照來記錄清理進度, 減小沒必要要的清理.
val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
一個Map, 用來記錄分區正在進行中的clean狀態信息. 有三個狀態(LogCleaningInProgress, LogCleaningAborted, LogCleaningPaused)
當判斷到分區髒日誌大於閥值時, 會進入LogCleaningInProgress狀態, 開始清理, 當清理完成時, 更新快照, 講分區從inProgress中移除.
LogCleaningInProgress => LogCleaningAborted : 日誌截取(truncate)時, 須要等待Clean完成, 且開始日誌截取後, 就不能進行Clean, 須要等待日誌截取完成, 才能繼續Clean, 這個是經過LogCleaningAborted狀態來實現的
過程是這樣的, 當日志截取時, 若是正在進行Clean, 則LogCleaningInProgress => LogCleaningAborted, 而後等待狀態到LogCleaningPaused, 當Clean完成時, 會進行LogCleaningAborted => LogCleaningPaused
這樣就實現了截取日誌時, Clean已經完成, 且分區被標記爲LogCleaningPaused狀態, 這樣Clean線程後續也會忽略這個分區, 完成日誌截取後, 經過CleanerManager.resumeCleaning方法, 將分區狀態信息從Map中移除, 等待下次clean
一樣, 刪除Log時, 也要作相似操做, 這時, 先將Log實例從Log池中移除, 而後保證Clean已經完成, 由於Log已經再也不池中了, 因此下次Clean也會再Clean了, 因此這裏用的是Cleaner.abortCleaning, 裏面組合了abortAndPauseCleaning, resumeCleaning
val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
好了, LogCleanerManager基本功能清楚了, 接下來回到LogCleaner
}
最好看看Clean線程 CleanerThread extends ShutdownableThread {
ShutdownableThread裏面的run方法裏循環調用子類的doWork方法, 因此具體邏輯咱們看doWork方法
應該是具體Clean操做邏輯的類
val cleaner = new Cleaner() {
清理過程當中的狀態信息, 如開始時間, 讀取字節數等信息, 一個用來表是當前的狀態, 一個用來表示上一次完成的狀態
val statsUnderlying = (new CleanerStats(time), new CleanerStats(time))
用來合併同Key消息的Map, 主要是索引用, 先讀一遍髒日誌, 創建Map
offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, hashAlgorithm = config.hashAlgorithm){
既然是hash, 就先看hash算法吧
private def positionOf(hash: Array[Byte], attempt: Int): Int = {
具體就是hash以後的值是一個byte數組, 當衝突次數小於(hashSize - 4)時, 取Int(hash_byte[attempt, attempt + 3]),
當衝突次數高於(hashSize - 4)時, 取Int(hash_byte[12, hashSize]) + (attempt - hashSize + 4)
即先求hash, 而後衝突時, 使用簡單的策略來繼續新的hash值, 來計算新的位置
}
內部維護一個ByteBuffer, 不存儲原始Key, 只存key的hash值, 碰撞後, 繼續新的位置, 讀取時相似, 讀取, 比對, hash不相對, 計算新的hash, 再讀取比對
碰撞挺高的, 越到後面, 插入效率越低, 碰撞高了, 讀取效率也較低, 不支持刪除(1000元素的容量, 插滿後, 碰撞率爲19左右)
}
}
doWork邏輯 cleanOrSleep {
調用cleanerManager.grabFilthiestLog(), 掃描全部Log獲取要清理的分區, 具體條件爲: 1)Log配置開啓compact, 2)沒有處於inProgress中, 3)髒日誌大於minCleanableRatio比例, 4)取最髒的那個.
grabFilthiestLog返回LogToClean對象, 裏面包含髒日誌比例, 字節數, firstDirtyOffset等信息
清理邏輯 cleaner.clean(cleanable) {
清理Clean過程當中的統計信息stats.clear(), 準備開始新的Clean過程
先構建出Key索引Map, 清理範圍是[firstDirtyOffset, activeSegment.baseOffset], 即清理除活動segment外的段
val upperBoundOffset = log.activeSegment.baseOffset
val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1 {
map.clear(), 清空offsetMap, 準備建索引
根據offsetMap容量和負載因子, 計算出最多能清理到的offset(minStopOffset)
而後遍歷segment, 讀取文件(有限流), 構建map, buildOffsetMapForSegment(){
不細說, 具體就是讀取文件內容到buffer, 而後經過ByteBufferMessageSet來遍歷消息, 創建map, 若是buffer不夠讀一條消息的, 就增大, 繼續讀
返回索引的最後一條消息的offset, 處理完後, 會恢復buffer
}
返回索引的最後一條消息的offset, 加1以後, 就是真實的清理上界offset(不含自身)
}
val deleteHorizonMs = 計算一個時間點, 用來判斷消息是否須要忽略(刪除), 即壓縮的過程同時作清理的工做, 根據配置, 獲取已清理過的最大offset的文件的修改時間, 沒有文件則取0
而後將offset[0, endOffset]的segment拿出來處理(即0到offsetMap中已索引的最大的offset), 根據大小進行group操做, 由於之前可能進行過clean操做, 致使segment可能變小, 這裏group成配置指定的大小, 再處理.
group後的結構是一個List<List<Segment>>, 內層的List是要合併的segement
遍歷group, 對裏面的每一個組作clean操做 cleanSegments(log, group, offsetMap, deleteHorizonMs){
接下來就是讀老文件, 合併同key的offset, 在寫到臨時文件, 最後重命名文件, 刪除老文件, 換成新文件, 具體4.1有提到
先建立log和index的臨時文件(.cleaned), 若是存在, 則表示上次處理到一半的文件, 直接刪除, 處理過的消息, 先寫到.clean文件中, 防止寫到一半掛了, 保證一致完整性.
同時建立對應的OffsetIndex和LogSegment對象
而後對每一個segment進行處理 cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes){
處理分區前, 先檢查分區是否是處於LogCleaningAborted狀態(truncate時會進入這個狀態, 具體看5.1), 是就拋異常結束這裏處理
而後清理readbuffer, writebuffer, 而後藉助ByteBufferMessageSet對象來讀取消息, 中間會判斷readbuffer是否是過小(1條msg都讀不到), 過小就擴容和前面buildOffsetMap相似,
先吧消息讀到readbuffer裏面(即一次讀多條, 不夠一條消息就須要擴容), 再對於每條消息, 獲取到key和offset, 而後根據前面構建的OffsetMap來判斷這條消息是否是留(offset要大於或等於OffsetMap裏面的(map裏面是最大的offset)),
也會判斷這個文件是否是達到了刪除的條件, 要刪的文件, 則消息不用處理, 最後, 若是消息體爲空, 表示要忽略這條消息
將消息寫到writebuffer裏面去, 由於readbuffer和writebuffer同樣大, 且擴容的時候一塊兒擴容的, 因此不怕writebuffer大小不夠, 而後將writebuffer寫到segment中去
最後還原readbuff, writebuffer, clean過程當中, 也會有相似於進度收集, IO限速等控制
}
好了, 一組segment已經clean成一個了, 接着trim一下index文件到真實的大小, 而後flush日誌和索引到文件, 接着恢復一下文件的lastModified, 由於不恢復會致使原本達到刪除時間的文件繼續存在
最後, 交換一下, 用新的segment替換老的segment, log.replaceSegments(cleaned, segments){
具體4.1有提到, 先將新的segment文件的後綴從.cleaned重命名到.swap文件, 重命名成功後, 若是進程掛了, 啓動時loadSegments會完成後續的操做
若是剛重命名爲.swap, 老的log尚未刪除掉, 則下次啓動的時候, loadSegments中重命名會失敗, 致使啓動失敗, 能夠手動刪除老的log文件再啓動
將新的segment放入到log.segments中, 將老的一個個移除(除了剛纔put的, put新的seg至關於刪除了這個老的seg, 因此這裏不移除了),
而後將老的文件重命名爲.deleted文件, 異步線程中刪除老的segment文件 , 最後, 將新的segment重命名, 去掉swap後綴, 完成clean
}
}cleanSegments
} cleaner.clean
日誌記錄 recordStats, 記錄上次Clean狀態this.lastStats = stats, 順便交換Clean狀態類(Cleaner.statsUnderlying.swap)
好了, clean操做總數是結束了
}cleanOrSleep
}CleanerThread
}redis
6.LogManager總結{
首先, kafka對於一個topic, 會分爲多個partition, 一個partition一個文件夾, 分區下面又分爲多個segment, segment中又分爲log和index, .log文件是最終的消息存放文件
log文件負責消息的讀寫, index負責index的讀寫, segment聚合index和log, 提供一個統一的讀寫接口, 屏蔽索引等相關的操做細節,
而Log聚合segment, 維護lastOffset, 統一flush操做, 屏蔽掉文件滾動操做的細節, 提供一個更加上層的接口, 屏蔽底層文件的讀寫.
LogManager則負責提供快照, 日誌Clean, 相關定時任務管理等功能, 外部經過LogManager來獲取Log對象讀寫消息
}
算法