topic :主題名,用於對消息進行分類,是一個邏輯上的概念html
partition :是物理上的一個概念,一個topic能夠對應多個partition,消息實際存儲在partition正則表達式
partition 是一個有序,不可變的記錄序列。partition中的每一條消息都有一個序列號,稱之爲offset,offset 在一個partition內惟一,用於區別消息shell
segment :partition被分爲多個segment文件進行存儲apache
partition 的物理存儲結構編程
建立topic:test.show.log, 副本數2,分區數3,segment文件大小512bytebootstrap
# 建立topic bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic test.show.log --config segment.bytes=512 # 查看topic 狀態 bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test.show.log Topic:test.show.log PartitionCount:3 ReplicationFactor:2 Configs:segment.bytes=512 Topic: test.show.log Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: test.show.log Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: test.show.log Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
能夠看到broker.0 服務持有partition 0,1的副本,而且爲1 partition的leader數組
打開其日誌保存目錄能夠看到緩存
test.show.log-0 test.show.log-1
broker 對於topic 的每個 partition 使用單獨的目錄保存,每一個目錄下初始有服務器
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint
.log文件: segment 日誌文件網絡
.index文件: segment offset索引文件
.timeindex文件:segment timestamp索引文件
leader-epoch-checkpoint:用於副本備份機制
每個分區的日誌文件被分爲多個segment文件,segment文件的命名規則:
segment 文件記錄的信息
.index 文件:偏移量->消息在.log文件中的物理位置.timeindex 文件:時間戳T->.index文件中的偏移量offset,表示表示比T晚的全部消息偏移量都比offset大
根據offset查找文件的步驟
.index 使用 稀疏索引,kafka會在內存中維護一份索引,經過二分查找定位到消息所在的.log文件,以及在.log文件中的位置。
分區的意義
分段的意義
timeindex 的意義
消息格式通過了幾個版本的變動
具體見:
https://www.cnblogs.com/qwang...
https://kafka.apache.org/docu...
挑幾個字段來講明
key: 每一個消息均可以指定一個key。能夠經過給多個消息指定同一個key分發到同一個partition中,從而保證消息的有序性。headers:可變長消息頭,能夠不須要解析payload而拿到一些消息的屬性信息。
timestamp: 時間戳。時間戳有兩種類型:CreateTime,LogAppendTime。
能夠經過 broker 配置 log.message.timestamp.type來指定全局topic時間戳類型;
也能夠經過命令行建立topic時單獨指定該topic的時間戳類型。
生產者發送消息時,能夠指定消息的事件戳,若是未指定,則使用生產者客戶端當前時間。
kafka client 對 partition的讀寫都是直接訪問leader。那麼客戶端是如何找到leader的?
主要是經過發送一個稱之爲TopicMetadataRequest的請求來獲取。
TopicMetadataRequest => [TopicName] TopicName => string
Field Type Description TopicName []string 想要獲取的topic的元數組,若是爲空,則下發全部topic的元數據 MetadataResponse => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32]
Field Description Broker kafka broker的信息,包括broker id, hostname, port Isr 與leader 保持同步的broker id Leader leader的broker id,若是當前不存在leader(leader正在選舉中),則爲-1 Replicas 全部當前存活的follower
服務器MetaData的存儲後續研究
kafka 協議文檔地址:https://cwiki.apache.org/conf...
push :broker 控制傳輸速度,若是消費者處理速度較慢,會積壓大量消息,最終致使消費者拒絕提供服務
pull :適用於向消費者批量發送大量數據
pull 方式的不足:
消費者可能須要輪訓等待消息的到達,爲避免這種狀況,拉取請求中能夠指示等待給定數量的數據到達。
auto.offset.reset
consumer group && offset commit
consumer 做爲組進行消費時,須要記錄每一個分區消費的offset,以便於進行重平衡後,新的消費者能夠從上一個位置繼續消費。
這個offset是由consumer主動提交的,broker會記錄在稱之爲__consumer_offsets的topic中,其對應的partition爲 hash(group.id)%mode
提交方式
- 自動提交
enable.auto.commit=true
- 手動提交
一個partition,同一時間只會被同一group內的consumer消費,若是consumer數量大於partition數量,則多餘的consumer一直空閒
rebalance
consumer 的離開加入,partition數量的變化,以及訂閱topic數發生變動(能夠經過正則表達式訂閱多個topic)都會致使rebalance,rebalance 是由一個稱之爲coordinator的broker來負責的。
coordinator 的選取:
- 看offset保存在哪一個partition中
- 該partition的leader做爲該group的coordinator
rebalance 大體流程:
- coordinator 隨機選取一個consumer做爲leader,並將角色信息發給consumers,還會把follower的信息發送給leader。
- consumer leader 根據獲得的信息分配partition
- consumers 發送同步請求至coordinator,consumer leader 發送的請求中包含分配狀況
- coordinator 將分配狀況告訴全部consumer
replica
kafka 默認使用副本機制,將不須要備份的topic看作副本數爲1。kafka 備份的單元是topic partition。
follower 做爲 consumer 從 leader 拉取數據,而後應用到本身的log中。拉取方式可使follower 批量寫入本身的log。
Isr (in sync)狀態
需同時知足如下兩個條件
- 節點的session必須在zookeeper中(必須和zk保持鏈接,經過zookeeper的心跳機制)
- 若是是一個follwer,必須同步leader的寫入,而且不能太落後
leader 跟蹤 Isr 節點集,若是follower 死亡,卡住,或落後,leader 就會將他從 Isr 列表中刪除。卡住或滯後副本的肯定由replica.lag.time.max.ms配置項控制。
replica.lag.time.max.ms
If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isrlong
default 10000
定義:Isr集合內的全部節點將其寫入本身的log中
只有確認commited的消息纔會被consumer 消費,消費者沒必要擔憂會獲得一條會丟失的消息,也就是說只要消費者獲得了一條消息,那麼即便leader掛掉,該消息也不會丟失(由於已同步至Isr集合)。
對於producer來講,能夠在發送請求的即時性和消息的持久化之間進行權衡,來選擇是否等待消息commited。
這個選項能夠經過acks配置。
acks=-1|all 與min.insync.replicas 配合能夠最大程度保證消息的可靠性。
acks=-1|all 表示消息被追加到Isr集合內全部節點後才返回,若是當前Isr只有一個leader(follower因爲某些緣由掉線),那麼也會返回成功,隨後leader掛掉,其餘follower被選舉爲leader,那麼該條消息就會丟失。min.insync.replicas 能夠指定,若是當前Isr內節點數量小於min.insync.replicas指定數量,則producer直接拋出異常。
leader election
kafka controller.brokers中的一個節點會擔當controller的角色(聽說是經過zk建立節點,建立成功則爲controller,失敗則監聽該節點,若是controller掛掉,則再次競爭)
controller 負責管理整個集羣中分區和副本的狀態。
leader選舉
若是Isr中有至少一個replica倖存,則選擇其中一個爲leader。不然選擇該partition中的任意一個倖存的replica爲leader。
replica 都不工做
- 等待ISR中的任一個replica 活過來,而且選他做爲leader。
- 選擇第一個活過來的replica做爲leader。
第一種等待的時間可能會比較長,或者不可用。第二種不保證包含了全部已commited的消息。
須要在可用性和一致性當中作出折中。
unclean.leader.election.enable 配置指定使用哪一種方案,默認是true,使用第2種。
數據的一致性
待研究
日誌刪除策略
kafka對於過時日誌的刪除有兩種策略
delete
直接刪除過時的segment文件
compact
經過建立新的segment文件將相同key的最新一條消息保留下來(縮容,合併)
compact 效果圖:
應用場景
這個特性能夠保證日誌包含每一個key的最終值的完整快照,消費者就能夠從這個topic中恢復本身的狀態,而不須要保留全部更改的完整日誌。
待研究
首先確保 broker 配置項 log.cleaner.enable 爲 true.
能夠經過設置broker 配置項 log.cleanup.policy 爲 compact,默認是 delete.
能夠經過設置topic 配置項c leanup.policy 爲 compact,默認是delete.
topic 配置項能夠覆蓋broker全局配置.
log.cleanup.policy 和 cleanup.policy 取值能夠是compact或delete或compact,delete
線程模型
一個線程做爲acceptor,接受tcp鏈接,n個處理線程,每一個處理線程處理固定數量的鏈接。
協議
定長編碼int8, int16, int32, int64。大端序
變長編碼
length+content
字符串 length 用int16表示,二進制數組length用int32表示。
content爲空用length=-1表示
數組
sizeof(array)+array[0]+array[1]....+array[n]
數組大小使用int32表示
增長broker,增長partition,增長replication
待研究
producer請求的冪等性
冪等:
在編程中一個冪等操做的特色是其任意屢次執行所產生的影響均與一次執行的影響相同
producer 引入冪等性的意義:
防止生產者重複生產消息。生產者進行retry時重複產生消息,有了冪等性以後,在進行retry重試時,只會有一條消息commited。
實現方式:
- PID。每一個producer在初始化時會被分配一個惟一的PID,改PID對用戶不可見
- Sequence Number。producer 在向每一個paritition發送的每條數據都會攜帶一個seq。其從0遞增
- broker 會緩存PID及其seq。若是收到的消息seq比緩存seq大1(highwater mark)則接受,不然丟棄。(tcp ack)
非冪等
冪等
PID 的生成:
- producer 從任意一個broker獲取事務協調者(Transaction Coordinator)的信息
- producer 向 Transaction Coordinator 請求PID
Transaction Coordinator 的選擇
PID的生成規則及保存失效規則
注意事項:
若是使用kafka的冪等性,則必須開啓 topic 配置 enable.idempotenceWhen set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires
max.in.flight.requests.per.connection
to be less than or equal to 5,retries
to be greater than 0 andacks
must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, aConfigException
will be thrown.前提條件 acks必須是all
待研究
待研究