在consumer章節,咱們知道,在consumer的pollOnce()中調用sendFetches()方法,html
本節主要介紹服務端處理fetchRequest請求的過程,FetchRequest由服務端函數KafkaApis.handleFetchRequest處理, FetchReuqest中重要的字段是requestInfo: Map[TopicAndPartition, PartitionFetchInfo])函數
即對於Fetch請求來講,關注點是TopicAndParititon執行Fetch的offset以及FetchSize。 post
其實Kafka的主從同步也是經過FetchRequest來完成,與consumer拉取消息的過程類似,都在handleFetchRequest()中進行處理,不過broker對他們的處理在身份驗證上作了區分,下面咱們看具體的FetchRequest處理過程:fetch
該函數的源碼以下:ui
在函數的開頭部分spa
先是執行readMessageSets讀取了log上當前可讀的數據,這個數據量若是已經知足了Fetch的條件的話,後面會直接返回。以後會判斷Fetch是不是來自於Follower的同步請求,若是是,則會調用recordFollowerLogEndOffsets記錄Follower的offset。.net
該函數會調用ReplicaManager.updateReplicaLEOAndPartitionHW:3d
雖然執行的代碼量不多,但recordFollowerLogEndOffsets帶來的影響很大:日誌
這裏和ProducerRequest同樣,將FetchRequest組裝爲DelayedFetch並加入到Purgatory中。code
若是不是來自replica的請求,調用ReplicaManager.fetchMessages()方法,從本地副本中獲取數據,並等待足夠多的數據進行返回,其中傳入的responseCallback方法在超時或者是知足fetch條件時將會被調用,將結果返回給客戶端。
總體來講,分爲如下幾步:
readFromLocalLog() 方法的處理過程:
存儲層對 Fetch 請求的處理
每一個 Replica 會對應一個 log 對象,而每一個 log 對象會管理相應的 LogSegment 實例。
Log 對象的 read()
方法的實現以下所示:
從實現能夠看出,該方法會先查找對應的 Segment 對象(日誌分段),而後循環直到讀取到數據結束,若是當前的日誌分段沒有讀取到相應的數據,那麼會更新日誌分段及對應的最大位置。讀取日誌分段時,會先讀取 offset 索引文件再讀取數據文件,具體步驟以下:
LogSegment
關乎 數據文件、offset 索引文件和時間索引文件真正的操做都是在 LogSegment 對象中的,日誌讀取也與這個方法息息相關。
read()
方法的實現以下:
從上面的實現來看,上述過程分爲如下三部分:
translateOffset()
translateOffset()
方法的實現過程主要分爲兩部分:
lookup()
查找方法,獲取離 startOffset 最接近的物理位置;searchFor()
方法,從指定的物理位置開始讀取每條數據,知道找到對應 offset 的物理位置。
參考資料: