https://github.com/haogrgr/haogrgr-test/blob/master/logs/kafka_source.txtjava
源碼閱讀(0.8.2.2):git
(一)概覽github
1.調用kafka.Kafka中的main方法啓動json
2.經過啓動參數獲取配置文件的路徑api
3.經過System.getProperty(log4j.configuration)來獲取日誌配置併發
4.加載配置文件, 校驗配置app
5.根據配置啓動指標導出任務
KafkaMetricsReporter.startReporters(serverConfig.props)
根據配置kafka.metrics.reporters, kafka.metrics.polling.interval.secs, 來初始化指標報告類,
內部是使用的com.yammer.metrics來作指標收集(最新的版本更名爲了io.dropwizard.metrics),
Kafka提供一個內部實現kafka.metrics.KafkaCSVMetricsReporter, 內部又實用CsvReporter(com.yammer.metrics提供), 將指標信息寫出到指定目錄下的csv文件, 順便註冊個MBean.dom
6.調用內部的啓動類
val kafkaServerStartable = new KafkaServerStartable(serverConfig)
kafkaServerStartable.startup異步
7.註冊shutdown hook, 等待shutdown信號(阻塞), 調用shutdown方法關閉.fetch
(二)KafkaServerStartable.startup啓動邏輯
1.設置broker狀態爲Starting, 初始話shutdown信號(CountDownLatch)和狀態(isShuttingDown)
2.kafkaScheduler.startup()
啓動kafka調度任務線程池, 方法內部爲初始化ScheduledThreadPoolExecutor, 其實就是ScheduledThreadPoolExecutor的一個包裝, 注意這裏的線程是使用daemon類型
3.zkClient = initZk()初始化zkClient連接
這裏能夠設置 `zookeeper.connect=localhost:2181/kafka` 這樣的url, kafka會將kafka相關的zk路徑創建在/kafka下, 方便一個zk註冊多個kafka集羣.
4.初始化LogManager, logManager = KafkaServer.createLogManager(zkClient, brokerState){
首先, 根據配置文件中的配置建立LogConfig對象,
而後, 從zk上面獲取全部topic的配置, 合併配置(zk上的配置覆蓋文件中的配置)
AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
從/brokers/topics下獲取全部的topic, 而後循環/config/topics/xxx獲取xxx的配置(json中config屬性)
根據配置建立CleanerConfig對象
而後建立LogManager實例, new LogManager(){
private val logs = new Pool[TopicAndPartition, Log]()
初始化Log池Pool[併發map的包裝]
createAndValidateLogDirs(logDirs)
建立日誌目錄
private val dirLocks = lockLogDirs(logDirs)
初始化文件鎖FileLock,內部JDK的文件鎖
初始化OffsetCheckpoint對象(saves out a map of topic/partition=>offsets to a file)
recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
OffsetCheckpoint初始化會刪除對應的.tmp文件, 並建立對應的文件(若是不存在).
OffsetCheckpoint文件格式爲:
第一行: 文件格式版本號, 目前是隻有一個版本(0).
第二行: 文件包含的記錄數.
其餘行: 具體的快照信息, 格式爲, `topic partition offset`, 如 haogrgr 0 100.
OffsetCheckpoint寫是先寫.tmp文件, 而後再rename操做, 最後刷盤(writer.flush();fileOutputStream.getFD().sync()).
loadLogs(){
具體見下面的2.4.1.加載日誌 loadLogs().
}
}
}
4.1.加載日誌 loadLogs() {
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
首先, 爲每一個日誌dir建立一個線程池來異步執行初始化任務
遍歷所有的dir列表{
首先, 爲每一個日誌dir, 關聯一個線程池(線程數num.recovery.threads.per.data.dir), 用來初始化Log實例, 方法執行完畢即關閉
而後, 經過日誌目錄下的CleanShutdownFile文件來判斷是否爲正常關閉, 正常關閉的時候(LogManager.shutdown方法裏面), 會建立該文件, 表示正常關閉,
非正常關閉, 將狀態設置爲RecoveringFromUncleanShutdown
(大概看了下, 後續的Log.loadSegments會檢查CleanShutdownFile, 而後初始化完成後進行Log.recoverLog操做, 細節TODO)
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
recoveryPoints是一個map[topic_partition, offset], kafka在正常關閉, 或定時任務, 或者清理日誌的時候(細節TODO), 會將當前每一個分區的最新的offset寫到快照文件中,
這裏讀取文件, 獲取每一個分區的快照信息(offset), recoveryPoint在Log對象中, 保存的是已經flush的最大的offset值, 在log.flush中, 刷盤後會更新該值, 即小於等於recoveryPoint的消息都是落盤了的.
主要做用是: 減小恢復時日誌的掃描數量; 經過(logEndOffset - recoveryPoint)能夠獲得未刷盤消息數, 作刷盤控制;
對與日誌dir下的每一個目錄(topic-partition目錄)建立初始化Log對象的任務 Utils.runnable {
val topicPartition = Log.parseTopicPartitionName(logDir.getName)
首先經過目錄名解析出來topic和partition
而後, 獲取topic配置類(根據前面2,4中zk上的配置和默認配置合併), 同時獲取 recoveryPoint值
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) {
建立Log對象實例
private val segments = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
Log對象屬性, 用來存放segment對象, LogSegment表示分區下的日誌文件及其對應的索引文件.
loadSegments() {
初始化分區下的全部LogSegment對象
首先建立日誌目錄(若是不存在)
而後遍歷日誌目錄下全部文件 {
刪除全部以[.deleted]結尾的文件(log和index), 何時會產生該後綴的文件?
a)根據配置, kafka會刪除一些舊日誌(LogSegment)(retentionMs, retentionSize), 定時任務LogManager.cleanupLogs;
b)日誌恢復操做, Log.recoverLog, 當非正常關閉kafka時, 會恢復日誌, 一旦發現不正常的日誌, 這個offset(含)以後的字節和LogSegment都會被刪除;
c)主從同步時, 當從落後太多(從的最大offset小於主的最小offset(可能日誌會被清理了)), 則從會logManager.truncateFullyAndStartAt, 來刪除老的日誌, 重新的offset開始;
d)主從同步時, 因爲分區Leader的變化, 以前和舊Leader同步的數據可能不是最新的, 須要刪除highWatermark(offset)(TODO)以後的數據, 防止不一致, ReplicaManager.makeFollowers;
刪除步驟爲:
a)先講要刪除的LogSegment從log.segments中移除;
b)再重命名日誌和索引文件名後綴爲.deleted;
c)最後提交異步任務, 任務中再刪除日誌和對應的索引文件.
刪除全部以[.cleaned]結尾的文件(log和index), 何時會產生該後綴的文件?
a)Cleaner.clean中, 會將多個Segment清理成一個Segment, 而後交換到Log.segments中(清理:同key的消息, 去最後的value), 交換過程當中, 先是將多個Segment中的日誌(合併同key消息)寫入到.cleaned文件中, 寫完後, 重命名爲.swap文件, 而後刪除老Segment文件, 最後去掉.swap後綴;
能夠看到, 交換步驟爲分爲三步, 第一步先寫.cleaned文件, 保證文件所有清理完後再操做, 而後重命名爲.swap文件, 這時能夠刪除老的文件了, 刪除操做參考上面的.deleted文件操做, 最後重命名, 去掉.swap後綴, 中間任何一步異常, 都不會破壞文件完整性.
一個疑惑: 當重命名爲.swap成功, 可是立刻carsh了, 致使老的log沒有移除, 那麼下次啓動時, 老的日誌依然存在, 如何處理(猜想: 由於是clean, 因此只會clean達到調節的log, 下次啓動會繼續clean操做, 待驗證TODO)
處理.swap文件, 如上面說的, 當swap操做進行到一半而掛掉了, 就可能會有.swap文件, 這裏須要完成swap操做, 重命名去掉.swap後綴, 刪除索引, 後續會判斷相關的log文件是否有對應的index文件, 沒有會重建索引文件.
}
而後再次遍歷日誌目錄下全部文件{
首先, 刪除沒有對應log文件的index文件.
而後, 若是爲log文件, 則建立LogSegment對象, 若是沒有對應的index文件, 則重建LogSegment.recover, 而後將segment放入到log的segments中去, key爲文件名(startOffset).
重建索引文件 LogSegment.recover {
遍歷log文件, 每隔指定間隔字節數, 就在索引文件中添加一條索引, 最後設置log和index文件大小爲有效的字節數
}
則建立LogSegment對象 segment = new LogSegment {
建立FileMessageSet對象{
這裏調用的是def this(file: File)這個構造方法, 內部會調用FileMessageSet(file, new RandomAccessFile(file, "rw").getChannel(), 0, Int.MaxValue, false)
這裏經過RandomAccessFile來獲取到對應的FileChannel, 提供相似於切片的功能, 經過維護start, end, isSlice來實現, 提供iterator方式來遍歷整個日誌文件.
消息添加是經過ByteBufferMessageSet.writeTo來從buffer寫到文件channel的.
這個類主要提供Log文件的讀寫等操做
}
建立OffsetIndex對象{
建立startOffset.index文件
建立對應的RandomAccessFile實例:val raf = new RandomAccessFile(file, "rw")
若是老的index文件存在, 即file.createNewFile返回true, 則設置文件長度爲小於maxIndexSize(默認1m, 最小爲8b), 若是不爲8的倍數, 則取最近的8的倍數 :raf.setLength(roundToExactMultiple(maxIndexSize, 8))
而後經過raf.getChannel.map來內存映射文件, 獲取MappedByteBuffer
最後, 設置buffer的position指針, 若是新文件, 就是0, 老文件, 則是, 文件大小, 而後關閉流
這個類主要的功能就是維護索引, 先是mmap索引文件, 而索引文件中內容是已8個字節爲一個entry, 其中前4個字節爲相對offset(原始offset-baseOffset), 後4個字節爲日誌文件偏移,
查找時採用二分查找, 由於offset在索引文件中是有序的, 同時由於是mmap, 因此查找效率高, 主要用於日誌讀取時使用(LogSegment.translateOffset)
這裏並非每一個消息offset都索引, 而是間隔必定大小索引一次(indexIntervalBytes), 因此查找到文件位置後, 還須要再去log中去查找到精確的位置, 具體的判斷是在LogSegment中實現的.
}
LogSegment是log和index的包裝, 提供一個統一的api來統一的操做index和log, 屏蔽log和index細節.
包含了append, read, flush, delete等方法.
}
}
好了, 通過前面兩次的遍歷, 已經建立好了LogSegment並都放到Log.segments中去了
若是目錄是空的, 就建立一個startOffset=0的LogSegment, 加入到Log.segments中去.
若是目錄不是空的, 就進行Log.recoverLog操做{
首先, 若是是正常關閉的(hasCleanShutdownFile), 則沒啥好恢復的, 設置recoveryPoint爲下一個offset, 結束方法.
非正常結束, 須要恢復recoveryPoint(前面2.4.1有講)以後的LogSegment對應的日誌, 經過Log.segments的方法, 獲取大於recoveryPoint的記錄,
遍歷須要恢復操做的LogSegment列表, 對每一個LogSegment, 遍歷日誌文件, 重建索引, 遍歷的時候校驗消息(computeChecksum等), 一旦某條消息出問題了, 這條消息和它後面的數據都會被刪除.
同時, 改LogSegment以後的LogSegment也會被刪除.
// reset the index size of the currently active log segment to allow more entries
activeSegment.index.resize(config.maxIndexSize)
最後設置當前活動的LogSegment(startOffset最大的segment), 的index文件爲config.maxIndexSize, 由於上一步會吧index文件設置爲真實大小, 而當前LogSegment還會有add操做, 會致使index寫失敗.
kafka.log.OffsetIndex.append中會校驗index是否滿了(require(!isFull)).
}
最後, 一個簡單的校驗, 校驗index文件大小是否是8的倍數.
}loadSegments()結束
}new Log()結束
建立完Log實例後, 加入到LogManager.logs中(key:TopicPartition, value:Log實例), 若是存在TopicPartition對應兩個Log實例, 報錯
}Utils.runnable結束
最後, 提交上面的任務(Utils.runnable)到線程池中並行執行, 並收集結果.
}遍歷所有的dir列表, 結束
對每一個log dir, 獲取上面的任務的執行結果, 無異常, 則刪除目錄下面的cleanShutdownFile文件.
最後結束線程池.
}loadLogs 關閉
5.日誌管理器啓動 logManager.startup(){
}
問題記錄:
1.調試過程當中, 碰到了個問題, 啓動的時候, 報了NPE(kafka.log.OffsetIndex.forceUnmap), 調試發現, 是由於方法內部調用了sun.nio.ch.DirectBuffer.cleaner().clean(), 而cleaner()方法可能會返回Null, 致使空異常. 調試DirectBuffer, 他的cleaner是在構造方法的時候初始化的, 當OffsetIndex.mmap屬性初始化的時候, 會將index文件映射爲MappedByteBuffer, 經過sun.nio.ch.FileChannelImpl.map方法, 而當文件大小爲0的時候, 並不會建立cleaner實例, 因此致使DirectBuffer.cleaner().clean()出現NPE異常, 可是爲何index文件會是空的, 明明已經寫入消息了, TODO. 補充一點, sun.misc.Cleaner實現PhantomReference接口, 用來在引用的對象被回收的時, 則就會把對象放到PhantomReference隊列中, 應用能夠經過隊列獲取到Reference對象, 以便作些回收的工做, 看Cleaner代碼時, 發現, 並無使用PhantomReference隊列, 而後查看到java.lang.ref.Reference對象中對Cleaner會優化處理, 當發現爲Cleaner類型時, 直接調用Cleaner.clean方法, 其餘類型則enqueue.