__consumer_offsets:做用是保存 Kafka 消費者的位移信息
__transaction_state:用來存儲事務日誌消息apache
所謂的優先副本是指在AR集合列表中的第一個副本。
理想狀況下,優先副本就是該分區的leader 副本,因此也能夠稱之爲 preferred leader。Kafka 要確保全部主題的優先副本在 Kafka 集羣中均勻分佈,這樣就保證了全部分區的 leader 均衡分佈。以此來促進集羣的負載均衡,這一行爲也能夠稱爲「分區平衡」。緩存
Kafka 中的消息是以主題爲基本單位進行歸類的,各個主題在邏輯上相互獨立。每一個主題又能夠分爲一個或多個分區。不考慮多副本的狀況,一個分區對應一個日誌(Log)。爲了防止 Log 過大,Kafka 又引入了日誌分段(LogSegment)的概念,將 Log 切分爲多個 LogSegment,至關於一個巨型文件被平均分配爲多個相對較小的文件。安全
Log 和 LogSegment 也不是純粹物理意義上的概念,Log 在物理上只以文件夾的形式存儲,而每一個 LogSegment 對應於磁盤上的一個日誌文件和兩個索引文件,以及可能的其餘文件(好比以「.txnindex」爲後綴的事務索引文件)網絡
每一個日誌分段文件對應了兩個索引文件,主要用來提升查找消息的效率。
偏移量索引文件用來創建消息偏移量(offset)到物理地址之間的映射關係,方便快速定位消息所在的物理文件位置
時間戳索引文件則根據指定的時間戳(timestamp)來查找對應的偏移量信息。負載均衡
Kafka是經過seek() 方法來指定消費的,在執行seek() 方法以前要去執行一次poll()方法,等到分配到分區以後會去對應的分區的指定位置開始消費,若是指定的位置發生了越界,那麼會根據auto.offset.reset 參數設置的狀況進行消費。性能
Kafka提供了一個 offsetsForTimes() 方法,經過 timestamp 來查詢與此對應的分區位置。offsetsForTimes() 方法的參數 timestampsToSearch 是一個 Map 類型,key 爲待查詢的分區,而 value 爲待查詢的時間戳,該方法會返回時間戳大於等於待查詢時間的第一條消息對應的位置和時間戳,對應於 OffsetAndTimestamp 中的 offset 和 timestamp 字段。操作系統
日誌刪除(Log Retention):按照必定的保留策略直接刪除不符合條件的日誌分段。
咱們能夠經過 broker 端參數 log.cleanup.policy 來設置日誌清理策略,此參數的默認值爲「delete」,即採用日誌刪除的清理策略。線程
基於時間
日誌刪除任務會檢查當前日誌文件中是否有保留時間超過設定的閾值(retentionMs)來尋找可刪除的日誌分段文件集合(deletableSegments)retentionMs 能夠經過 broker 端參數 log.retention.hours、log.retention.minutes 和 log.retention.ms 來配置,其中 log.retention.ms 的優先級最高,log.retention.minutes 次之,log.retention.hours 最低。默認狀況下只配置了 log.retention.hours 參數,其值爲168,故默認狀況下日誌分段文件的保留時間爲7天。
刪除日誌分段時,首先會從 Log 對象中所維護日誌分段的跳躍表中移除待刪除的日誌分段,以保證沒有線程對這些日誌分段進行讀取操做。而後將日誌分段所對應的全部文件添加上「.deleted」的後綴(固然也包括對應的索引文件)。最後交由一個以「delete-file」命名的延遲任務來刪除這些以「.deleted」爲後綴的文件,這個任務的延遲執行時間能夠經過 file.delete.delay.ms 參數來調配,此參數的默認值爲60000,即1分鐘。scala
基於日誌起始偏移量
基於日誌起始偏移量的保留策略的判斷依據是某日誌分段的下一個日誌分段的起始偏移量 baseOffset 是否小於等於 logStartOffset,如果,則能夠刪除此日誌分段。
設計
如上圖所示,假設 logStartOffset 等於25,日誌分段1的起始偏移量爲0,日誌分段2的起始偏移量爲11,日誌分段3的起始偏移量爲23,經過以下動做收集可刪除的日誌分段的文件集合 deletableSegments:
從頭開始遍歷每一個日誌分段,日誌分段1的下一個日誌分段的起始偏移量爲11,小於 logStartOffset 的大小,將日誌分段1加入 deletableSegments。
日誌分段2的下一個日誌偏移量的起始偏移量爲23,也小於 logStartOffset 的大小,將日誌分段2加入 deletableSegments。
日誌分段3的下一個日誌偏移量在 logStartOffset 的右側,故從日誌分段3開始的全部日誌分段都不會加入 deletableSegments。
收集完可刪除的日誌分段的文件集合以後的刪除操做同基於日誌大小的保留策略和基於時間的保留策略相同
日誌壓縮(Log Compaction):針對每一個消息的 key 進行整合,對於有相同 key 的不一樣 value 值,只保留最後一個版本。
若是要採用日誌壓縮的清理策略,就須要將 log.cleanup.policy 設置爲「compact」,而且還須要將 log.cleaner.enable (默認值爲 true)設定爲 true。
以下圖所示,Log Compaction 對於有相同 key 的不一樣 value 值,只保留最後一個版本。若是應用只關心 key 對應的最新 value 值,則能夠開啓 Kafka 的日誌清理功能,Kafka 會按期將相同 key 的消息進行合併,只保留最新的 value 值。
頁緩存是操做系統實現的一種主要的磁盤緩存,以此用來減小對磁盤 I/O 的操做。具體來講,就是把磁盤中的數據緩存到內存中,把對磁盤的訪問變爲對內存的訪問。
當一個進程準備讀取磁盤上的文件內容時,操做系統會先查看待讀取的數據所在的頁(page)是否在頁緩存(pagecache)中,若是存在(命中)則直接返回數據,從而避免了對物理磁盤的 I/O 操做;若是沒有命中,則操做系統會向磁盤發起讀取請求並將讀取的數據頁存入頁緩存,以後再將數據返回給進程。
一樣,若是一個進程須要將數據寫入磁盤,那麼操做系統也會檢測數據對應的頁是否在頁緩存中,若是不存在,則會先在頁緩存中添加相應的頁,最後將數據寫入對應的頁。被修改事後的頁也就變成了髒頁,操做系統會在合適的時間把髒頁中的數據寫入磁盤,以保持數據的一致性。
用過 Java 的人通常都知道兩點事實:對象的內存開銷很是大,一般會是真實數據大小的幾倍甚至更多,空間使用率低下;Java 的垃圾回收會隨着堆內數據的增多而變得愈來愈慢。基於這些因素,使用文件系統並依賴於頁緩存的作法明顯要優於維護一個進程內緩存或其餘結構,至少咱們能夠省去了一份進程內部的緩存消耗,同時還能夠經過結構緊湊的字節碼來替代使用對象的方式以節省更多的空間。
此外,即便 Kafka 服務重啓,頁緩存仍是會保持有效,然而進程內的緩存卻須要重建。這樣也極大地簡化了代碼邏輯,由於維護頁緩存和文件之間的一致性交由操做系統來負責,這樣會比進程內維護更加安全有效。
除了消息順序追加、頁緩存等技術,Kafka 還使用零拷貝(Zero-Copy)技術來進一步提高性能。所謂的零拷貝是指將數據直接從磁盤文件複製到網卡設備中,而不須要經由應用程序之手。零拷貝大大提升了應用程序的性能,減小了內核和用戶模式之間的上下文切換。對 Linux 操做系統而言,零拷貝技術依賴於底層的 sendfile() 方法實現。對應於 Java 語言,FileChannal.transferTo() 方法的底層實現就是 sendfile() 方法。
Kafka 中有多種延時操做,好比延時生產,還有延時拉取(DelayedFetch)、延時數據刪除(DelayedDeleteRecords)等。
延時操做建立以後會被加入延時操做管理器(DelayedOperationPurgatory)來作專門的處理。延時操做有可能會超時,每一個延時操做管理器都會配備一個定時器(SystemTimer)來作超時管理,定時器的底層就是採用時間輪(TimingWheel)實現的。
在 Kafka 集羣中會有一個或多個 broker,其中有一個 broker 會被選舉爲控制器(Kafka Controller),它負責管理整個集羣中全部分區和副本的狀態。當某個分區的 leader 副本出現故障時,由控制器負責爲該分區選舉新的 leader 副本。當檢測到某個分區的 ISR 集合發生變化時,由控制器負責通知全部broker更新其元數據信息。當使用 kafka-topics.sh 腳本爲某個 topic 增長分區數量時,一樣仍是由控制器負責分區的從新分配。
如上圖,舊版消費者客戶端每一個消費組(
每一個消費者在啓動時都會在 /consumers/
這種方式下每一個消費者對 ZooKeeper 的相關路徑分別進行監聽,當觸發再均衡操做時,一個消費組下的全部消費者會同時進行再均衡操做,而消費者之間並不知道彼此操做的結果,這樣可能致使 Kafka 工做在一個不正確的狀態。與此同時,這種嚴重依賴於 ZooKeeper 集羣的作法還有兩個比較嚴重的問題。
就目前而言,一共有以下幾種情形會觸發再均衡的操做:
GroupCoordinator 是 Kafka 服務端中用於管理消費組的組件。而消費者客戶端中的 ConsumerCoordinator 組件負責與 GroupCoordinator 進行交互。
消費者須要肯定它所屬的消費組對應的 GroupCoordinator 所在的 broker,並建立與該 broker 相互通訊的網絡鏈接。若是消費者已經保存了與消費組對應的 GroupCoordinator 節點的信息,而且與它之間的網絡鏈接是正常的,那麼就能夠進入第二階段。不然,就須要向集羣中的某個節點發送 FindCoordinatorRequest 請求來查找對應的 GroupCoordinator,這裏的「某個節點」並不是是集羣中的任意節點,而是負載最小的節點。
在成功找到消費組所對應的 GroupCoordinator 以後就進入加入消費組的階段,在此階段的消費者會向 GroupCoordinator 發送 JoinGroupRequest 請求,並處理響應。
選舉消費組的leader
若是消費組內尚未 leader,那麼第一個加入消費組的消費者即爲消費組的 leader。若是某一時刻 leader 消費者因爲某些緣由退出了消費組,那麼會從新選舉一個新的 leader
選舉分區分配策略
leader 消費者根據在第二階段中選舉出來的分區分配策略來實施具體的分區分配,在此以後須要將分配的方案同步給各個消費者,經過 GroupCoordinator 這個「中間人」來負責轉發同步分配方案的。
進入這個階段以後,消費組中的全部消費者就會處於正常工做狀態。在正式消費以前,消費者還須要肯定拉取消息的起始位置。假設以前已經將最後的消費位移提交到了 GroupCoordinator,而且 GroupCoordinator 將其保存到了 Kafka 內部的 __consumer_offsets 主題中,此時消費者能夠經過 OffsetFetchRequest 請求獲取上次提交的消費位移並今後處繼續消費。
消費者經過向 GroupCoordinator 發送心跳來維持它們與消費組的從屬關係,以及它們對分區的全部權關係。只要消費者以正常的時間間隔發送心跳,就被認爲是活躍的,說明它還在讀取分區中的消息。心跳線程是一個獨立的線程,能夠在輪詢消息的空檔發送心跳。若是消費者中止發送心跳的時間足夠長,則整個會話就被斷定爲過時,GroupCoordinator 也會認爲這個消費者已經死亡,就會觸發一次再均衡行爲。
爲了實現生產者的冪等性,Kafka 爲此引入了 producer id(如下簡稱 PID)和序列號(sequence number)這兩個概念。
每一個新的生產者實例在初始化的時候都會被分配一個 PID,這個 PID 對用戶而言是徹底透明的。對於每一個 PID,消息發送到的每個分區都有對應的序列號,這些序列號從0開始單調遞增。生產者每發送一條消息就會將 <PID,分區> 對應的序列號的值加1。
broker 端會在內存中爲每一對 <PID,分區> 維護一個序列號。對於收到的每一條消息,只有當它的序列號的值(SN_new)比 broker 端中維護的對應的序列號的值(SN_old)大1(即 SN_new = SN_old + 1)時,broker 纔會接收它。若是 SN_new< SN_old + 1,那麼說明消息被重複寫入,broker 能夠直接將其丟棄。若是 SN_new> SN_old + 1,那麼說明中間有數據還沒有寫入,出現了亂序,暗示可能有消息丟失,對應的生產者會拋出 OutOfOrderSequenceException,這個異常是一個嚴重的異常,後續的諸如 send()、beginTransaction()、commitTransaction() 等方法的調用都會拋出 IllegalStateException 的異常。