Server端處理fetchRequest請求

Server端處理fetchRequest請求

1前言

在consumer章節,咱們知道,在consumer的pollOnce()中調用sendFetches()方法,html

 

本節主要介紹服務端處理fetchRequest請求的過程,FetchRequest由服務端函數KafkaApis.handleFetchRequest處理, FetchReuqest中重要的字段是requestInfo: Map[TopicAndPartition, PartitionFetchInfo])函數

即對於Fetch請求來講,關注點是TopicAndParititon執行Fetch的offset以及FetchSize。 post

其實Kafka的主從同步也是經過FetchRequest來完成,與consumer拉取消息的過程類似,都在handleFetchRequest()中進行處理,不過broker對他們的處理在身份驗證上作了區分,下面咱們看具體的FetchRequest處理過程:fetch

2 handleFetchRequest()處理過程

該函數的源碼以下:ui

 

 

 

 

在函數的開頭部分spa

 

 

先是執行readMessageSets讀取了log上當前可讀的數據,這個數據量若是已經知足了Fetch的條件的話,後面會直接返回。以後會判斷Fetch是不是來自於Follower的同步請求,若是是,則會調用recordFollowerLogEndOffsets記錄Follower的offset。.net

該函數會調用ReplicaManager.updateReplicaLEOAndPartitionHW:3d

 

 

雖然執行的代碼量不多,但recordFollowerLogEndOffsets帶來的影響很大:日誌

  • l  根據Fetch讀到的message的offset表明了follower的leo,因此replica中的logEndOffsetMetadata和logEndOffsetUpdateTimeMsValue變量會更新;
  • l  replicaManager.maybeShrinkIsr函數做爲一個按期任務,會根據replica的logEndOffsetMetadata和logEndOffsetUpdateTimeMsValue變量清理ISR,將leo落後太多或者長時間沒更新的replica從ISR中踢出;
  • l  replica的leo更新,若是知足條件leo > leaderHw,則該replica有可能會成爲ISR中的一員,並更新zk中的ISR記錄。
  • l  若是replica原本就是ISR,leo更新意味着leaderHw也有可能會發生變化。
  • l  在requiredAcks>1或者=-1時,DelayedProduce請求所需條件與replica是否知足leo>requiredOffset有關,因此須要調用producerRequestPurgatory.update;
  • l  若是FetchRequest不想等待,或者已經讀到了足夠的數據,FetchRequest會直接使用已經讀到的數據進行返回。
  • l  不然,會執行以下代碼:

 

 

這裏和ProducerRequest同樣,將FetchRequest組裝爲DelayedFetch並加入到Purgatory中。code

若是不是來自replica的請求,調用ReplicaManager.fetchMessages()方法,從本地副本中獲取數據,並等待足夠多的數據進行返回,其中傳入的responseCallback方法在超時或者是知足fetch條件時將會被調用,將結果返回給客戶端。

 

 

 

總體來講,分爲如下幾步:

  • l  readFromLocalLog():調用該方法,從本地日誌拉取相應的數據;
  • l  判斷 Fetch 請求來源,若是來自副本同步,那麼更新該副本的 the end offset 記錄,若是該副本不在 isr 中,並判斷是否須要更新 isr;
  • l  返回結果,知足條件的話立馬返回,不然的話,經過延遲操做,延遲返回結果。

 

 

 

 

 

 

readFromLocalLog() 方法的處理過程:

  • l  先根據要拉取的 topic-partition 獲取對應的 Partition 對象,根據 Partition 對象獲取對應的 Replica 對象;
  • l  根據 Replica 對象找到對應的 Log 對象,而後調用其 read() 方法從指定的位置讀取數據。

存儲層對 Fetch 請求的處理

每一個 Replica 會對應一個 log 對象,而每一個 log 對象會管理相應的 LogSegment 實例。

Log 對象的 read() 方法的實現以下所示:

 

 

從實現能夠看出,該方法會先查找對應的 Segment 對象(日誌分段),而後循環直到讀取到數據結束,若是當前的日誌分段沒有讀取到相應的數據,那麼會更新日誌分段及對應的最大位置。讀取日誌分段時,會先讀取 offset 索引文件再讀取數據文件,具體步驟以下:

  • l  根據要讀取的起始偏移量(startOffset)讀取 offset 索引文件中對應的物理位置;
  • l  查找 offset 索引文件最後返回:起始偏移量對應的最近物理位置(startPosition);
  • l  根據 startPosition 直接定位到數據文件,而後讀取數據文件內容;
  • l  最多能讀到數據文件的結束位置(maxPosition)。

LogSegment

關乎 數據文件、offset 索引文件和時間索引文件真正的操做都是在 LogSegment 對象中的,日誌讀取也與這個方法息息相關。

read() 方法的實現以下:

 

 

從上面的實現來看,上述過程分爲如下三部分:

  • l  根據 startOffset 獲得實際的物理位置(translateOffset());
  • l  計算要讀取的實際物理長度;
  • l  根據實際起始物理位置和要讀取實際物理長度讀取數據文件。

translateOffset()

translateOffset() 方法的實現過程主要分爲兩部分:

  • l  查找 offset 索引文件:調用 offset 索引文件的 lookup() 查找方法,獲取離 startOffset 最接近的物理位置;
  • l  調用數據文件的 searchFor() 方法,從指定的物理位置開始讀取每條數據,知道找到對應 offset 的物理位置。

 

 

 

參考資料:

https://blog.csdn.net/c395318621/article/details/53164123

http://www.daleizhou.tech/posts/consume-messages.html

相關文章
相關標籤/搜索