kafka是你們比較經常使用的消息中間件,本文主要介紹kafka基本組件及其相關原理網絡
基本架構
- Broker:消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker能夠組成一個Kafka集羣
- Topic:Kafka根據topic對消息進行歸類,發佈到Kafka集羣的每條消息都須要指定一個topic
- Producer:消息生產者,向Broker發送消息的客戶端
- Consumer:消息消費者,從Broker讀取消息的客戶端
- ConsumerGroup:每一個Consumer屬於一個特定的Consumer Group,一條消息能夠發送到多個不一樣的Consumer Group,可是一個Consumer Group中只能有一個Consumer可以消費該消息
- Partition:物理上的概念,一個topic能夠分爲多個partition,每一個partition內部是有序的
偏移量
Kafka經過offset保證消息在分區內的順序,offset的順序性不跨分區 Kafka0.10之後,使用一個專門的topic __consumer_offset保存offset __consumer_offset日誌留存方式爲compact,也就是說,該topic會對key相同的消息進行整理架構
__consumer_offset內保存三類消息:
- Consumer group組元數據消息
- Consumer group位移消息
- Tombstone消息
kafka log
存儲
每一個Partition其實都會對應一個日誌目錄:{topicName}-{partitionid}/,在目錄下面會對應多個日誌分段(LogSegment)。LogSegment文件由兩部分組成,分別爲「.index」文件和「.log」文件 app
索引文件使用稀疏索引的方式,避免對日誌每條數據建索引,節省存儲空間
發送
使用page cache順序讀文件,操做系統能夠預讀數據到 page cache 使用mmap直接將日誌文件映射到虛擬地址空間 操作系統
read()是系統調用,首先將文件從硬盤拷貝到內核空間的一個緩衝區,再將這些數據拷貝到用戶空間,實際上進行了兩次數據拷貝; mmap()也是系統調用,但沒有進行數據拷貝,當缺頁中斷髮生時,直接將文件從硬盤拷貝到用戶空間,只進行了一次數據拷貝。 Java中使用MappedByteBuffer封裝了mmap
零拷貝:消息數據直接從 page cache 發送到網絡 一般的文件讀取須要經歷下圖的流程,有兩次用戶態與內核態之間內存的拷貝 3d
kafka使用零拷貝,避免消息在內核態和用戶態間的來回拷貝
副本
-
每個分區都存在一個ISR(in-sync replicas)日誌
-
ISR集合中的每個副本都與leader保持同步狀態,不在裏面的保持不了同步狀態cdn
-
只有ISR中的副本纔有資格被選爲leader中間件
-
Producer寫入的消息只有被ISR中的副本都接收到,才被視爲"已提交" blog
-
Log End Offset:Producer 寫入到 Kafka 中的最新一條數據的 offset索引
-
High Watermark:已經成功備份到其餘 replicas 中的最新一條數據的 offset,也就是說 Log End Offset 與 High Watermark 之間的數據已經寫入到該 partition 的 leader 中,可是還未成功備份到其餘的 replicas 中
副本同步流程:
Controller
Controller相似於集羣的master,主要管理以下幾塊:
- Broker 的上線、下線處理
- topic 的分區擴容,處理分區副本的分配、leader 選舉
Controller經過broker搶佔zk臨時節點選舉出來,且controller與全部broker創建長鏈接
Controller管理partition leader選舉,主要有如下幾種方式:
選舉方式 |
說明 |
OfflinePartitionLeaderSelector |
leader 掉線時觸發 |
ReassignedPartitionLeaderSelector |
分區的副本從新分配數據同步完成後觸發的 |
PreferredReplicaPartitionLeaderSelector |
最優 leader 選舉,手動觸發或自動 leader 均衡調度時觸發 |
ControlledShutdownLeaderSelector |
broker 發送 ShutDown 請求主動關閉服務時觸發 |
消息冪等
問題:
- 在 0.11.0 以前,producer保證at least once
- at least once可能帶來重複數據 網絡請求延遲等致使的重試操做,在發送請求重試時 Server 端並不知道這條請求是否已經處理(沒有記錄以前的狀態信息),因此就會有可能致使數據請求的重複發送,這是 Kafka 自身的機制(異常時請求重試機制)致使的數據重複
解決方案:
- PID(Producer ID),用來標識每一個 producer client
- sequence numbers,client 發送的每條消息都會帶相應的 sequence number,Server 端就是根據這個值來判斷數據是否重複
Rebalance
kafka rebalance發生的5種狀況:
- 有新的消費者加入Consumer Group。
- 有消費者宕機下線。消費者並不必定須要真正下線,例如遇到長時間的GC、網絡延遲致使消費者長時間未向GroupCoordinator發送HeartbeatRequest時,GroupCoordinator會認爲消費者下線。
- 有消費者主動退出Consumer Group。
- Consumer Group訂閱的任一Topic出現分區數量的變化。
- 消費者調用unsubscrible()取消對某Topic的訂閱。
kafka經過GroupCoordinator管理rebalance操做
- GroupCoordinator是KafkaServer中用於管理Consumer Group的組件
- GroupCoordinator在ZooKeeper上添加Watcher
- 獲取GroupCoordinator:消費者會向Kafka集羣中的任一Broker發送ConsumerMetadataRequest
- 消費者鏈接到GroupCoordinator並週期性地發送HeartbeatRequest
- 若是HeartbeatResponse中帶有IllegalGeneration異常,說明GroupCoordinator發起了Rebalance操做,此時進入rebalance環節 Rebalance分爲兩個流程。
Join Group:
- Consumer首先向GroupCoordinator發送JoinGroupRequest請求,其中包含消費者的相關信息
- GroupCoordinator從中選取一個消費者成爲Group Leader,封裝成JoinGroupResponse返回給每一個消費者
- 只有Group Leader收到的JoinGroupResponse中封裝了全部消費者的信息, Group Leader根據消費者的信息以及選定的分區分配策略進行分區分配。
Sync Group:
- 每一個消費者會發送SyncGroupRequest到GroupCoordinator,可是隻有Group Leader的SyncGroupRequest請求包含了分區的分配結果
- GroupCoordinator根據Group Leader的分區分配結果,造成SyncGroupResponse返回給全部Consumer
- 消費者收到SyncGroupResponse後進行解析,便可獲取分配給自身的partition