kafka每一個topic有多個partition,單個partition內消息有序。Partition在物理存儲上由多個segment組成,每一個segment內包含兩個文件,index文件和log文件。
物理實體 index文件和log文件
邏輯實體 topic > partition > segment算法
1.partition存儲
在kafka文件存儲中,同一個Topic下有多個不一樣的partition,每一個partition爲一個目錄,partition命名規則爲topic名稱+有序序號,第一個partition序號從0開始,序號最大值爲partition數量減1。
每一個partition(目錄)至關於一個大型文件被平均分配到大小(可配置log.segment.bytes)相等的segment數據文件中,但每一個segment file消息數量不必定相等,取決於消息大小,方便快速刪除。
2.segment存儲
Segment file由兩個部分組成,分別是index file和data file,一一對應,成對出現,後綴爲.index和.log,分別對應索引文件和數據文件。Segment的文件命名第一個從0開始,後續每一個segment文件名爲上一個segment文件最後一條消息的offset值。segmentfault
.index文件是索引文件,每行數據包括兩個值,第幾條消息+該消息在log文件的物理偏移量。.log文件存儲消息的實際數據,每行由offset+message組成。具體以下圖所示:緩存
message參數說明:負載均衡
關鍵字 解釋說明
8 byte offset 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱爲偏移(offset),它能夠惟一肯定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校驗message
1 byte 「magic" 表示本次發佈Kafka服務程序協議版本號
1 byte 「attributes" 表示爲獨立版本、或標識壓縮類型、或編碼類型。
4 byte key length 表示key的長度,當key爲-1時,K byte key字段不填
Kbyte key 可選
value bytes payload 表示實際消息數據。異步
以上圖中查找offset=36876的message爲例,須要經過如下兩個步驟:
第一步查找segment file,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)爲0。第二個文件00000000000000368769.index的消息量起始偏移量爲368770 = 368769 + 1。一樣,第三個文件00000000000000737337.index的起始偏移量爲737338=737337 + 1,其餘後續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset 二分查找文件列表,就能夠快速定位到具體文件。當offset=368776時定位到00000000000000368769.index|logasync
第二步經過segment file查找message,經過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和00000000000000368769.log的物理偏移地址,而後再經過00000000000000368769.log順序查找直到offset=368776爲止。編碼
生產者可與任一broker鏈接(生產者不會與zookeeper通訊),得到topic的partition信息(每一個broker都有全部topic信息),找到每一個partition的leader所在broker,再與該broker創建鏈接。發送消息時,經過輪詢或者隨機選取partition的方式,決定消息被髮送到哪個partition。
kafka的消息發送包括同步和異步兩種方式。同步發送可配置acks參數,該參數可配置消息的確認級別。當acks=-1,則要求全部ISR中的replica都肯定拿到消息後再返回給生產者成功(leader會先將消息落盤,ISR中的replica拿到後不必定落盤,到內存就算成功);acks=0則直接返回成功(不用leader確認);acks=1,則leader把消息落盤後再返回。異步發送則直接返回發送成功,由後臺線程掃描隊列長度,達到必定長度或者配置時間再批量發送消息到leader。spa
a. 建立topic時會往zk註冊topic的分區信息
b. 生產者從broker獲取topic的全部分區
c. 根據必定的負載均衡算法決定將消息發往哪一個分區
d. 最終根據分區所在的leader broker將消息發送到broker
e. 當topic分區變化時,生產者會從新從broker獲取新的分區信息
Kafka的消息生產者使用Producer.scala,客戶端經過producer.type配置可以使用sync和async兩種模式。客戶端調用Producer.send發送消息。
在同步模式下,首先調用DefaultEventHandler.handle方法,序列化消息,序列化方式是默認的Encoder,可自定義實現(producer配置serializer.class),以後在最大重試次數(默認三次)內嘗試發送消息,調用dispatchSerializedData方法,在該方法內選擇消息的partition。線程
若是消息沒有key,且是該客戶端對應topic下首條消息,則隨機選擇一個partition,並緩存對應的partition和topic的關係到sendPartitionPerTopicCache,以後該topic下沒有key的消息都將發往該分區。sendPartitionPerTopicCache將在對應的配置時間(topic.metadata.refresh.interval.ms,默認爲600000)內clear,防止全部消息都發往同一個partition。
若是消息key不爲空,則調用默認的分區方法DefaultPartitioner.partition。key hash以後的值再對分區值取模,獲得消息對應的分區。可自行實現Partitioner接口,實現自定義的分區策略(producer新增配置partitioner.class)。scala
消息到達broker後,leader先將該消息落盤。再根據acks參數決定是否返回消息寫入成功,若是acks=-1,則需等待ISR中的replica複製消息,所有複製完成後再返回成功,若是等待時間超時,則返回消息發送失敗。
若是要嚴格保證消息不丟失,可給該topic配置兩個以上replica,同時生產者的acks設置爲-1,每條消息都要求副本確認複製後再返回成功。
消息發送流程圖以下:
建立topic——kafka源碼探究之一
https://segmentfault.com/a/11...
broker的高可用及高伸縮——kafka源碼探究之二
https://segmentfault.com/a/11...