server處理produce請求

server處理produce請求

1,概述

在 Producer Client 端,Producer 會維護一個 ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches 的變量,而後會根據 topic-partition 的 leader 信息,將 leader 在同一臺機器上的 batch 放在一個 request 中,發送到 server,這樣能夠節省不少網絡開銷,提升發送效率。網絡

2,service處理過程

2.1 發送請求

Producer Client 發送請求的方法實現以下:app

 

 

kafka接收到producer請求後,經過其網絡模型,最終會交給KafkaApis組件處理操作系統

 

 

  • l  查看 topic 是否存在,以及 client 是否有相應的 Desribe 權限;
  • l  對於已經有 Describe 權限的 topic 查看是否有 Write 權限;
  • l  調用 replicaManager.appendRecords() 方法向有 Write 權限的 topic-partition 追加相應的 record。

2.2 ReplicaManager

ReplicaManager,副本管理器,做用是管理這臺 broker 上的全部副本(replica)。在 Kafka 中,每一個副本(replica)都會跟日誌實例(Log 對象)一一對應,一個副本會對應一個 Log 對象。.net

ReplicaManager 的並不負責具體的日誌建立,它只是管理 Broker 上的全部分區。在建立 Partition 對象時, Partition 會經過logManager 對象爲每一個 replica 建立對應的日誌。3d

ReplicaManager拿到請求內容後,主要作了以下事情:日誌

  • l  首先判斷 acks 設置是否有效(-1,0,1三個值有效),無效的話直接返回異常,再也不處理;
  • l  acks 設置有效的話,調用 appendToLocalLog() 方法將 records 追加到本地對應的 log 對象中;
  • l  appendToLocalLog() 處理完後,若是發現 clients 設置的 acks=-1,即須要 isr 的其餘的副本同步完成才能返回 response,那麼就會建立一個 DelayedProduce 對象,等待 isr 的其餘副本進行同步,不然的話直接返回追加的結果。

 

 

appendToLocalLog() 的實現code

 

 

從上面能夠看到 appendToLocalLog() 的實現以下:server

  • l  首先判斷要寫的 topic 是否是 Kafka 內置的 topic,內置的 topic 是不容許 Producer 寫入的;
  • l  先查找 topic-partition 對應的 Partition 對象,若是在 allPartitions 中查找到了對應的 partition,那麼直接調用 partition.appendRecordsToLeader() 方法追加相應的 records,不然會向 client 拋出異常。

ReplicaManager 在追加 records 時,調用的是 Partition 的 appendRecordsToLeader() 方法,partiton組件是topic在某個broker上一個副本的抽象。每一個partition對象都會維護一個Replica對象,Replica對象中又維護Log對象,也就是數據目錄的抽象,具體的實現以下:對象

 

 

parititon組件從replicaManager拿到消息後blog

  • l  先判斷本身是不是leader,只有leader才能夠接收producer請求而後寫數據
  • l  判斷當前的當前的ISR數量是否比minInSyncReplicas還小,若是ISR數量小於minInSyncReplicas就拋出異常
  • l  把消息交給本身管理的Log組件處理

2.3 Log

Log對象是對partition數據目錄的抽象。管理着某個topic在某個broker的一個partition,它多是一個leader,也多是replica。同時,Log對象還同時管理着多個LogSegment,也就是日誌的分段。

在 Log 對象的初始化時,有三個變量是比較重要的:

  • l  nextOffsetMetadata:能夠叫作下一個偏移量元數據,它包括 activeSegment 的下一條消息的偏移量,該 activeSegment 的基準偏移量及日誌分段的大小;
  • l  activeSegment:指的是該 Log 管理的 segments 中那個最新的 segment(這裏叫作活躍的 segment),一個 Log 中只會有一個活躍的 segment,其餘的 segment 都已經被持久化到磁盤了;
  • l  logEndOffset:表示下一條消息的 offset,它取自 nextOffsetMetadata 的 offset,實際上就是活動日誌分段的下一個偏移量。

 

 

 

 

 

Server 將每一個分區的消息追加到日誌中時,是以 segment 爲單位的,當 segment 的大小到達閾值大小以後,會滾動新建一個日誌分段(segment)保存新的消息,而分區的消息老是追加到最新的日誌分段(也就是 activeSegment)中。每一個日誌分段都會有一個基準偏移量(segmentBaseOffset,或者叫作 baseOffset),這個基準偏移量就是分區級別的絕對偏移量,並且這個值在日誌分段是固定的。有了這個基準偏移量,就能夠計算出來每條消息在分區中的絕對偏移量,最後把數據以及對應的絕對偏移量寫到日誌文件中。append() 方法的過程能夠總結以下:

  • l  analyzeAndValidateRecords():對這批要寫入的消息進行檢測,主要是檢查消息的大小及 crc 校驗;
  • l  trimInvalidBytes():會將這批消息中無效的消息刪除,返回一個都是有效消息的 MemoryRecords;
  • l  LogValidator.validateMessagesAndAssignOffsets():爲每條消息設置相應的 offset(絕對偏移量) 和 timestrap;
  • l  maybeRoll():判斷是否須要新建一個 segment 的,若是當前的 segment 放不下這批消息的話,須要新建一個 segment;
  • l  segment.append():向 segment 中添加消息;
  • l  更新 logEndOffset 和判斷是否須要刷新磁盤(若是須要的話,調用 flush() 方法刷到磁盤)。
  • 關於 timestrap 的設置,這裏也順便介紹一下,在新版的 Kafka 中,每條 msg 都會有一個對應的時間戳記錄,producer 端能夠設置這個字段 message.timestamp.type 來選擇 timestrap 的類型,默認是按照建立時間,只能選擇從下面的選擇中二選一:
  • l  CreateTime,默認值;
  • l  LogAppendTime。

在 Log 的 append() 方法中,會調用 maybeRoll() 方法來判斷是否須要進行相應日誌分段操做,其具體實現以下:

 

 

從 maybeRoll() 的實現能夠看到,是否須要建立新的日誌分段,有下面幾種狀況:

  • l  當前日誌分段的大小加上消息的大小超過了日誌分段的閾值(log.segment.bytes);
  • l  距離上次建立日誌分段的時間達到了必定的閾值(log.roll.hours),而且數據文件有數據;
  • l  索引文件滿了;
  • l  時間索引文件滿了;
  • l  最大的 offset,其相對偏移量超過了正整數的閾值。

建立一個 segment 對象,真正的實現是在 Log 的 roll() 方法中,建立 segment 對象,主要包括三部分:數據文件、offset 索引文件和 time 索引文件。

2.4 offset索引文件

這裏順便講述一下 offset 索引文件,Kafka 的索引文件有下面幾個特色:

  • l  採用 絕對偏移量+相對偏移量 的方式進行存儲的,每一個 segment 最開始絕對偏移量也是其基準偏移量;
  • l  數據文件每隔必定的大小建立一個索引條目,而不是每條消息會建立索引條目,經過 index.interval.bytes 來配置,默認是 4096,也就是4KB;
  • 這樣作的好處也很是明顯:
  • l  由於不是每條消息都建立相應的索引條目,因此索引條目是稀疏的;
  • l  索引的相對偏移量佔據4個字節,而絕對偏移量佔據8個字節,加上物理位置的4個字節,使用相對索引能夠將每條索引條目的大小從12字節減小到8個字節;
  • l  由於偏移量有序的,再讀取數據時,能夠按照二分查找的方式去快速定位偏移量的位置;
  • l  這樣的稀疏索引是能夠徹底放到內存中,加快偏移量的查找。

2.5LogSegment寫入

真正的日誌寫入,仍是在 LogSegment 的 append() 方法中完成的,LogSegment 會跟 Kafka 最底層的文件通道、mmap 打交道。

 

 

  • l  logSegment底層使用了fileChannel寫日誌,寫完以後還會判斷是否要更新當前logSegment的最大時間戳
  • l  每當寫入消息的大小積累到必定程度時,會新插入一條索引記錄。這個積累的大小和配置index.interval.bytes有關係

kafka底層的寫數據是根據fileChannel來寫的,它寫的時候不會馬上刷盤,而是開啓了一個定時任務根據策略去刷盤。可是在默認狀況下,這個定時任務又是不刷盤的(刷盤策略都不知足),kafka把刷盤的時機交給操做系統來掌控。

 


總結上述的流程以下圖所示

 

 

 

3參考資料:

https://blog.csdn.net/u013332124/article/details/82778419

http://matt33.com/2018/03/18/kafka-server-handle-produce-request/

相關文章
相關標籤/搜索