筆者在平時工做中時時都在享受kafka的性能和穩定性,但始終沒有機會真正瞭解kafka的設計思想和設計原理,最近報名了極客時間的一門課,開始堅持閱讀kafka源代碼。在此期間我也會持續更新本身梳理註釋過的源代碼。
鑑於本人的Scala代碼能力有限,加上kafka的代碼層次較深,在閱讀源碼的初期我會忽略一些kafka複雜設計的細節以及嵌套過深的方法調用,先從最核心的最小單元類開始,以後本身有了更全面的把握以後會有更全面的分析總結。
一點源碼相關信息
本次源碼閱讀的是kafka的主分支,kafka主分支。
ide採用Intellij idea,編譯工具gradle版本6.3。git
kafa的源碼中設計最豐富也最精妙的是server端或者說啥broker端的代碼,而kafka和衆多消息系統不一樣的地方在於kafka傳輸的消息是須要持久化的,這一特色在很大程度上造就kafka的容錯性。
正如kafka官方所指出的:github
As a result of taking storage seriously and allowing the clients to control their read position, you can think of Kafka as a kind of special purpose distributed filesystem dedicated to high-performance, low-latency commit log storage, replication, and propagation.
kafka從文件系統的角度來看是一個專一於提交日誌存儲、複製和增加的分佈式文件系統。從某種程度上來講,kafka的日誌部分上kafka全部上層系統的基石,也是最直接和系統相關的部分。掌握這一部分的源碼可以頗有效提升咱們對於kafka存儲功能的理解,對於kafka broker在文件系統上建立的文件的意義和做用,對於kafka的使用中可能出現的日誌問題會有更清晰的解決思路。apache
首先來看一下Log對象,以下可見官網的截圖:
kafka主題的一個分區對應一個Log對象,生產者會按順序向指定的分區追加日誌消息。若是咱們只關注單個分區內消息的追加過程:
上圖所示的是官網對於kafka log的抽象圖,事實上真正的日誌的管理遠比示意圖複雜,圖中的一個個位移數值表明的也不是簡單的一條消息,而是日誌段,消息會追加入對應的日誌段,而日誌段之間的位移保持有序,所以消息之間的保持有序。
每個分區中的消息以Log的形式存在,但Log並非日誌存儲和操做的最底層單元,kafka消息最底層的單元的是日誌段,即LogSegemnt, Log和LogSegement的關係如上圖所示。編程
這裏須要注意的是,kafka的日誌段並非一個物理概念,而是一個邏輯概念,一個日誌段包括一個消息日誌文件和若干索引文件組成,即一個.log和多個.xxxindex文件:
如圖是筆者剛執行了kafka官方的quickstart以後的topic對應文件夾(由於新增的test主題設置爲單分區,故只有一個分區文件夾,該文件夾下全部的文件對應一個Log)下的文件,由於筆者用生產者產生的消息條數不多,因此生成了一個日誌段,故topic文件夾下只有一個對應日誌段,因此只有一組對應的.log和.index文件。
leader-epoch文件涉及到的功能咱們在源碼閱讀初期不會關注,故在此不作敘述。緩存
如今咱們進入源碼閱讀部分。kafka日誌段相關的代碼位於kafa.core.src.main.scala.kafka.log.LogSegment.scala。
該文件包含一個class LogSegment
和兩個Object(scala語法,單例對象
):object LogSegment
和 object LogFlushStats
。
本次咱們主要關注class logSegment
。app
/** * A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in * any previous segment. */ class LogSegment private[log] (val log: FileRecords, val lazyOffsetIndex: LazyIndex[OffsetIndex], val lazyTimeIndex: LazyIndex[TimeIndex], val txnIndex: TransactionIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, val time: Time) extends Logging
這裏目前咱們只須要關注以下的一些參數。
類的定義直接明瞭地驗證了上面所說的,一個日誌段對應一個日誌文化和一系列索引文件,其中log
對應.log文件,而lazyOffsetIndex, lazyTimeIndex
對應默認配置必定會存儲的兩種索引文件,偏移量索引和時間戳索引文件,有關這幾種文件的具體內容會在後續的源碼閱讀系列文章中介紹。此處還有一個重要的參數是須要關注的,即baseOffset
。官方註釋對它的定義是編程語言
//@param baseOffset A lower bound on the offsets in this segment
baseOffset
表示該日誌段的消息位移的下限,即日誌段中全部的消息的位移最小值爲baseOffset,索引消息的位移值均大於等於baseOffset,如今咱們再回想以前的分區文件夾的截圖分佈式
如咱們以前所說,文件夾裏面只包含一個日誌段,全部的文件屬於一個日誌段。而由於這個日誌段是整個分區的第一個日誌段,因此它的baseOffset爲0,按照kafka的定義,一個日誌段用baseOffset做爲名字,將baseOffset補全爲20位即爲咱們所見的.log和.index文件的文件名。由此也baseOffset的重要性也可見一斑。ide
indexIntervalBytes
,即Broker 端參數 log.index.interval.bytes,它控制了索引文件的增長頻率。
對於日誌段而言,最重要的就是日誌的讀取和寫入了,因此咱們要重點理解日誌段的append
和read
方法。另外,recover 方法一樣很關鍵,它是 Broker 重啓後恢復日誌段的操做邏輯,這也是kafka高可用性和高容錯性的保證之一。工具
這裏咱們主要分析LogSegment的append()
方法。
@nonthreadsafe def append(largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { if (records.sizeInBytes > 0) {//確保須要寫入的消息集合不爲空 trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " + s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp") val physicalPosition = log.sizeInBytes()//確認當前.log文件已經寫到的位置 if (physicalPosition == 0)//若是日誌段爲空 rollingBasedTimestamp = Some(largestTimestamp)//更新用於日誌段切分的時間戳 ensureOffsetInRange(largestOffset)//確保要寫入的消息集合中位移最大值大於等於要寫入的日誌段的baseOffset且小於等於Int的最大值。 // append the messages val appendedBytes = log.append(records) trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset") // Update the in memory max timestamp and corresponding offset. if (largestTimestamp > maxTimestampSoFar) { maxTimestampSoFar = largestTimestamp offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp } // append an entry to the index (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) {//上一次寫入已經大於索引文件間斷值,須要新增索引項 offsetIndex.append(largestOffset, physicalPosition)//寫入新位移索引,位移索引保存消息位移值與物理文件寫入位置的對應關係 timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)//寫入新時間戳索引,時間戳索引項保存時間戳與消息位移的對應關係 bytesSinceLastIndexEntry = 0//清空已寫字節數 } bytesSinceLastIndexEntry += records.sizeInBytes//追加寫入字節數,以便下一次append使用。 } }
append
方法接收 4 個參數,分別表示待寫入消息批次中消息的最大位移值、最大時間戳、最大時間戳對應消息的位移以及真正要寫入的消息集合。
整個方法主要能夠分爲以下五步:
第一步:在源碼中,首先調用 log.sizeInBytes 方法判斷該日誌段是否爲空,若是是空的話, Kafka 須要記錄要寫入消息集合的最大時間戳,並將其做爲後面新增日誌段倒計時的依據。
第二步:代碼調用 ensureOffsetInRange 方法確保輸入參數最大位移值是合法的,即檢查largestOffset - baseOffset 的值是否是介於 [0,Int.MAXVALUE] 之間。
第三步:待這些作完以後,append 方法調用 FileRecords 的 append 方法執行真正的寫入。前面說過了,專欄後面咱們會詳細介紹 FileRecords 類。這裏你只須要知道它的工做是將內存中的消息對象寫入到操做系統的頁緩存就能夠了。
第四步:再下一步,就是更新日誌段的最大時間戳以及最大時間戳所屬消息的位移值屬性。每一個日誌段都要保存當前最大時間戳信息和所屬消息的位移信息。最大時間戳對應的消息的位移值則用於時間戳索引項。位移索引則記錄了消息位移和物理文件寫入位置的對應關係。
第五步:append 方法的最後一步就是更新索引項和寫入的字節數了。我在前面說過,日誌段每寫入 indexIntervalBytes量的數據就要寫入一個索引項。當已寫入字節數超過了 indexIntervalBytes的量就要寫入一個索引項,append 方法會調用索引對象的 append 方法新增索引項,同時清空已寫入字節數,以備下次從新累積計算。
read方法將從當前日誌段中讀取消息集合,源碼以下:
@threadsafe def read(startOffset: Long, maxSize: Int, maxPosition: Long = size, minOneMessage: Boolean = false): FetchDataInfo = { if (maxSize < 0) throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log") val startOffsetAndSize = translateOffset(startOffset)//將搜索offsetIndex文件,獲取符合條件的第一條消息的:1.位移值 2.消息大小 3.消息的物理文件位置 //置於startOffsetAndSize容器中 // if the start position is already off the end of the log, return null if (startOffsetAndSize == null)//若是該日誌段沒有符合條件的消息,返回空 return null val startPosition = startOffsetAndSize.position//從startOffsetAndSize 獲取消息物理位置 val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)//獲取日誌位移的元數據 val adjustedMaxSize = //更新計劃讀取的最大字節數 if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)//狀況一,至少一條設置爲true,則須要讀取的字節數= max(計劃要讀取字節數,第一條位移知足條件的消息的大小) else maxSize//狀況二,至少一條設置爲false,adjustedMaxSize直接等於maxSize // return a log segment but with zero size in the case below if (adjustedMaxSize == 0)//第一條大於初始位移值的消息的內容爲空或者要去讀取的字節數爲0 return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) // calculate the length of the message set to read based on whether or not they gave us a maxOffset val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)//更新真正能讀取的最大字節數,爲 //求日誌段最大的物理位置和初始物理位置的差值(當前日誌段能夠讀取的最大的字節數),再和更新後的計劃要讀取的字節數取較小值 FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)//若是要讀取的第一條消息過大,不是完整存在於當前日誌段,記錄該消息還沒讀取完 }
read
方法接收 4 個輸入參數。
其中第四個參數的含義須要額外解釋一下,即minOneMessage
。當這個參數爲 true 時,即便出現消息體字節數超過了 maxSize 的情形,read 方法依然能返回至少一條消息。引入這個參數主要是爲了確保不出現消費者須要讀取的數據大於一個日誌段的大小就始終消費不到數據的狀況 。
日誌讀取主要分爲三個步驟:
FileRecords
)的方法截取日誌段內容,讀取消息集合還有一個方法也須要咱們重點關注,那就是kafka日誌段的恢復方法。kafka broker會有須要重啓的時候,重啓後的機子如何恢復到宕機前的狀態是保證kafka高可用性的重點之一,而其中消息的恢復是最主要的內容之一,到底層的話,須要恢復的實際就是日誌段。
日誌段恢復所作的事情就是:Broker 在啓動時會從磁盤上加載全部日誌段信息到內存中,並建立相應的 LogSegment 對象實例。在這個過程當中,它須要執行一系列的操做。所以咱們須要學習瞭解 LogSegement的recover
方法,方法的源碼以下:
@nonthreadsafe def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = { offsetIndex.reset() timeIndex.reset() txnIndex.reset()//清空各類索引文件 var validBytes = 0//重置字節數 var lastIndexEntry = 0//重置索引記錄 maxTimestampSoFar = RecordBatch.NO_TIMESTAMP//重置最大時間戳 try { for (batch <- log.batches.asScala) {//遍歷日誌段文件中的全部消息集合 batch.ensureValid()//檢查消息集合內容是否符合kafka的二進制格式 ensureOffsetInRange(batch.lastOffset)//檢驗消息位移值合法性 // The max timestamp is exposed at the batch level, so no need to iterate the records if (batch.maxTimestamp > maxTimestampSoFar) {//更新最大時間戳和對應的消息位移,後序用於時間戳索引 maxTimestampSoFar = batch.maxTimestamp offsetOfMaxTimestampSoFar = batch.lastOffset } // Build offset index if (validBytes - lastIndexEntry > indexIntervalBytes) {//重建位移索引 offsetIndex.append(batch.lastOffset, validBytes)//位移索引記錄位移值和物理文件位置(字節數)的對應關係,更新位移索引 timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) lastIndexEntry = validBytes } validBytes += batch.sizeInBytes()//更新消息總字節數 if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {//更新事務性消費者和leaderEpoch(暫且不清楚原理) leaderEpochCache.foreach { cache => if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _)) cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } updateProducerState(producerStateManager, batch) } } } catch { case e@ (_: CorruptRecordException | _: InvalidRecordException) => warn("Found invalid messages in log segment %s at byte offset %d: %s. %s" .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause)) } val truncated = log.sizeInBytes - validBytes//開始執行消息截斷,截除不合法的字節,爲什麼會存在讀取的字節數大於實際的.log文件中的消息本體,須要後續深刻研究 if (truncated > 0) debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery") log.truncateTo(validBytes)//.log文件即消息本體截斷 offsetIndex.trimToValidSize()//位移索引對應截斷 // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well. timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)//最後確保更新時間戳索引 timeIndex.trimToValidSize()//時間戳索引對應截斷 truncated }
recover
方法接受兩個參數,producerStateManager
和leaderEpocheCache
,前者和對應該日誌段的baseOffset的事務性消費者狀態管理有關,咱們前面也說個這個基礎位移值是一個日誌段最重要的參數,然後者屬於相對複雜的功能,筆者對於源碼的系統理解還不足以說清楚LeaderEpoche這個機制,故這個話題暫且略過,但這個參數的忽略對咱們理解recover方法並不會有很大的影響。
recover方法的處理流程可用以下流程圖表示:
本系列kafka源碼解讀下一期內容將會繼續深挖消息日誌的存儲模塊,分析kafka消息存儲中的索引文件的做用以及Log(對應topic分區)和LogSegment的關係等內容。在本篇中咱們只看到在某些場景下索引文件會須要增長和修改和清空,以後咱們會看到它們具體起到了什麼樣的做用。
Kafka源碼解讀
Apache Kafka
Kafka官方的代碼註釋仍是相對很完善的,可是英文的理解有時候須要下一些功夫,對於不太好理解的方法,一層層深刻下去,去看到它的底層,返回類型是什麼,對於理解源碼組成部分的功能有很大的幫助。
同時我也感覺到,所謂的編程語言的差別性不少時候真的不是閱讀源碼的最大障礙,在理解功能過程當中學習語法,一個是實用性有了保證,二一個時間上也節省了很多,遇到不會的,技術社區問就行了,StackOverflow是個好地方,也相信往後的思否也會成爲有這樣技術沉澱和氛圍的地方。我也會繼續堅持,一是堅持本身的追求,二也但願爲社區做出貢獻,和你們一同進步提升。