阿里太注重原理了:阿里問kafka如何實現高併發存儲-如何找到一條須要消費的數據,kafka用了稀疏索引的方式,使用了二分查找法,其實不少索引都是二分查找法html
二分查找法的時間複雜度:O(logn) redis,kafka,B+樹的底層都採用了二分查找法 java
參考:二分查找法 redis的索引底層的 跳錶原理 實現 聊聊Mysql索引和redis跳錶 ---redis的跳錶原理 時間複雜度O(logn)(阿里) mysql
參考:二分查找法 mysql索引原理:一步步分析爲何B+樹適合做爲索引的結構 以及索引原理 (阿里面試)linux
參考:二分查找法:各類排序算法的時間複雜度和空間複雜度(阿里)git
這是答案:github
例如讀取offset=368776的message,須要經過下面2個步驟查找。面試
第一步查找segment fileredis
上述圖2爲例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)爲0.第二個文件00000000000000368769.index的消息量起始偏移量爲368770 = 368769 + 1.一樣,第三個文件00000000000000737337.index的起始偏移量爲737338=737337 + 1,其餘後續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset **二分查找**文件列表,就能夠快速定位到具體文件。算法
當offset=368776時定位到00000000000000368769.index|logsql
第二步經過segment file查找message
經過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和 00000000000000368769.log的物理偏移地址,而後再經過00000000000000368769.log順序查找直到 offset=368776爲止。
從上述圖3可知這樣作的優勢,segment index file採起稀疏索引存儲方式,它減小索引文件大小,經過mmap能夠直接內存操做,稀疏索引爲數據文件的每一個對應message設置一個元數據指針,它比稠密索引節省了更多的存儲空間,但查找起來須要消耗更多的時間。
具體參考:
Kafka 社區很是活躍,從 0.9 版本開始,Kafka 的標語已經從「一個高吞吐量,分佈式的消息系統」改成"一個分佈式流平臺"。
Kafka 和傳統的消息系統不一樣在於:
Kafka 和其餘消息隊列的對比:
Kafka 架構原理
對於 Kafka 的架構原理,咱們先提出以下幾個問題:
Kafka 架構圖
Kafka 名詞解釋
在一套 Kafka 架構中有多個 Producer,多個 Broker,多個 Consumer,每一個 Producer 能夠對應多個 Topic,每一個 Consumer 只能對應一個 Consumer Group。
整個 Kafka 架構對應一個 ZK 集羣,經過 ZK 管理集羣配置,選舉 Leader,以及在 Consumer Group 發生變化時進行 Rebalance。
Topic 和 Partition
在 Kafka 中的每一條消息都有一個 Topic。通常來講在咱們應用中產生不一樣類型的數據,均可以設置不一樣的主題。
一個主題通常會有多個消息的訂閱者,當生產者發佈消息到某個主題時,訂閱了這個主題的消費者均可以接收到生產者寫入的新消息。
Kafka 爲每一個主題維護了分佈式的分區(Partition)日誌文件,每一個 Partition 在 Kafka 存儲層面是 Append Log。
任何發佈到此 Partition 的消息都會被追加到 Log 文件的尾部,在分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,也就是咱們的 Offset。Offset 是一個 Long 型的數字。
咱們經過這個 Offset 能夠肯定一條在該 Partition 下的惟一消息。在 Partition 下面是保證了有序性,可是在 Topic 下面沒有保證有序性。
在上圖中咱們的生產者會決定發送到哪一個 Partition:
若是沒有 Key 值則進行輪詢發送。
若是有 Key 值,對 Key 值進行 Hash,而後對分區數量取餘,保證了同一個 Key 值的會被路由到同一個分區;若是想隊列的強順序一致性,可讓全部的消息都設置爲同一個 Key。
消費模型
消息由生產者發送到 Kafka 集羣后,會被消費者消費。通常來講咱們的消費模型有兩種:
基於推送模型的消息系統,由消息代理記錄消費狀態。消息代理將消息推送到消費者後,標記這條消息爲已經被消費,可是這種方式沒法很好地保證消費的處理語義。
好比當咱們已經把消息發送給消費者以後,因爲消費進程掛掉或者因爲網絡緣由沒有收到這條消息,若是咱們在消費代理將其標記爲已消費,這個消息就***丟失了。
若是咱們利用生產者收到消息後回覆這種方法,消息代理須要記錄消費狀態,這種不可取。
若是採用 Push,消息消費的速率就徹底由消費代理控制,一旦消費者發生阻塞,就會出現問題。
Kafka 採起拉取模型(Poll),由本身控制消費速度,以及消費的進度,消費者能夠按照任意的偏移量進行消費。
好比消費者能夠消費已經消費過的消息進行從新處理,或者消費最近的消息等等。
網絡模型
Kafka Client:單線程 Selector
單線程模式適用於併發連接數小,邏輯簡單,數據量小的狀況。在 Kafka 中,Consumer 和 Producer 都是使用的上面的單線程模式。
這種模式不適合 Kafka 的服務端,在服務端中請求處理過程比較複雜,會形成線程阻塞,一旦出現後續請求就會沒法處理,會形成大量請求超時,引發雪崩。而在服務器中應該充分利用多線程來處理執行邏輯。
Kafka Server:多線程 Selector
在 Kafka 服務端採用的是多線程的 Selector 模型,Acceptor 運行在一個單獨的線程中,對於讀取操做的線程池中的線程都會在 Selector 註冊 Read 事件,負責服務端讀取請求的邏輯。
成功讀取後,將請求放入 Message Queue共享隊列中。而後在寫線程池中,取出這個請求,對其進行邏輯處理。
這樣,即便某個請求線程阻塞了,還有後續的線程從消息隊列中獲取請求並進行處理,在寫線程中處理完邏輯處理,因爲註冊了 OP_WIRTE 事件,因此還須要對其發送響應。
高可靠分佈式存儲模型
在 Kafka 中保證高可靠模型依靠的是副本機制,有了副本機制以後,就算機器宕機也不會發生數據丟失。
高性能的日誌存儲 kafka採用了稀疏索引的方式
Kafka 一個 Topic 下面的全部消息都是以 Partition 的方式分佈式的存儲在多個節點上。
同時在 Kafka 的機器上,每一個 Partition 其實都會對應一個日誌目錄,在目錄下面會對應多個日誌分段(LogSegment)。
LogSegment 文件由兩部分組成,分別爲「.index」文件和「.log」文件,分別表示爲 Segment 索引文件和數據文件。
先經過index文件,利用二分查找法,找到相應的稀疏索引,而後跟進index上的偏移量,找到log文件的位置,而後在log順序遍歷上面找到相應的文件;
例如讀取offset=368776的message,須要經過下面2個步驟查找。
第一步查找segment file
上述圖2爲例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)爲0.第二個文件00000000000000368769.index的消息量起始偏移量爲368770 = 368769 + 1.一樣,第三個文件00000000000000737337.index的起始偏移量爲737338=737337 + 1,其餘後續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset **二分查找**文件列表,就能夠快速定位到具體文件。
當offset=368776時定位到00000000000000368769.index|log
第二步經過segment file查找message
經過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和 00000000000000368769.log的物理偏移地址,而後再經過00000000000000368769.log順序查找直到 offset=368776爲止。
從上述圖3可知這樣作的優勢,segment index file採起稀疏索引存儲方式,它減小索引文件大小,經過mmap能夠直接內存操做,稀疏索引爲數據文件的每一個對應message設置一個元數據指針,它比稠密索引節省了更多的存儲空間,但查找起來須要消耗更多的時間。
Kafka部分名詞解釋以下:
Broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker能夠組成一個Kafka集羣。
Topic:一類消息,例如page view日誌、click日誌等均可以以topic的形式存在,Kafka集羣可以同時負責多個topic的分發。
Partition:topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。
Segment:partition物理上由多個segment組成,下面2.2和2.3有詳細說明。
offset:每一個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每一個消息都有一個連續的序列號叫作offset,用於partition惟一標識一條消息.
分析過程分爲如下4個步驟:
topic中partition存儲分佈
partiton中文件存儲方式
partiton中segment文件存儲結構
在partition中如何經過offset查找message
經過上述4過程詳細分析,咱們就能夠清楚認識到kafka文件存儲機制的奧祕。
假設實驗環境中Kafka集羣只有一個broker,xxx/message-folder爲數據文件存儲根目錄,在Kafka broker中server.properties文件配置(參數log.dirs=xxx/message-folder),例如建立2個topic名稱分別爲report_push、launch_info, partitions數量都爲partitions=4(將一個topic分爲4個部分存儲)
存儲路徑和目錄規則爲:
xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
在Kafka文件存儲中,同一個topic下有多個不一樣partition,每一個partition爲一個目錄,partiton命名規則爲topic名稱+有序序號,第一個partiton序號從0開始,序號最大值爲partitions數量減1。
若是是多broker分佈狀況,請參考文末kafka集羣partition分佈原理分析
下面示意圖形象說明了partition中文件存儲方式:
圖1
每一個partion(目錄)至關於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每一個段segment file消息數量不必定相等,這種特性方便old segment file快速被刪除。
每一個partiton只須要支持順序讀寫就好了,segment文件生命週期由服務端配置參數決定。
這樣作的好處就是能快速刪除無用文件,有效提升磁盤利用率。
讀者從2.2節瞭解到Kafka文件系統partition存儲方式,本節深刻分析partion中segment file組成和物理結構。
segment file組成:由2大部分組成,分別爲index file和data file,此2個文件一一對應,成對出現,後綴".index"和「.log」分別表示爲segment索引文件、數據文件.
segment文件命名規則:partion全局的第一個segment從0開始,後續每一個segment文件名爲上一個segment文件最後一條消息的offset值。數值最大爲64位long大小,19位數字字符長度,沒有數字用0填充。
下面文件列表是筆者在Kafka broker上作的一個實驗,建立一個topicXXX包含1 partition,設置每一個segment大小爲500MB,並啓動producer向Kafka broker寫入大量數據,以下圖2所示segment文件列表形象說明了上述2個規則:
以上述圖2中一對segment file文件爲例,說明segment中index<—->data file對應關係物理結構以下:
上述圖3中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。
其中以索引文件中元數據3,497爲例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移地址爲497。
從上述圖3瞭解到segment data file由許多message組成,下面詳細說明message物理結構以下:
圖4
參數說明:
關鍵字 | 解釋說明 |
---|---|
8 byte offset | 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱爲偏移(offset),它能夠惟一肯定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message |
4 byte message size | message大小 |
4 byte CRC32 | 用crc32校驗message |
1 byte 「magic" | 表示本次發佈Kafka服務程序協議版本號 |
1 byte 「attributes" | 表示爲獨立版本、或標識壓縮類型、或編碼類型。 |
4 byte key length | 表示key的長度,當key爲-1時,K byte key字段不填 |
K byte key | 可選 |
value bytes payload | 表示實際消息數據。 |
實驗環境:
Kafka集羣:由2臺虛擬機組成
cpu:4核
物理內存:8GB
網卡:千兆網卡
jvm heap: 4GB
詳細Kafka服務端配置及其優化請參考:kafka server.properties配置詳解
圖5
從上述圖5能夠看出,Kafka運行時不多有大量讀磁盤的操做,主要是按期批量寫磁盤操做,所以操做磁盤很高效。這跟Kafka文件存儲中讀寫message的設計是息息相關的。Kafka中讀寫message有以下特色:
寫message
消息從java堆轉入page cache(即物理內存)。
由異步線程刷盤,消息從page cache刷入磁盤。
讀message
消息直接從page cache轉入socket發送出去。
當從page cache沒有找到相應數據時,此時會產生磁盤IO,從磁
盤Load消息到page cache,而後直接從socket發出去
Kafka高效文件存儲設計特色
Kafka把topic中一個parition大文件分紅多個小文件段,經過多個小文件段,就容易按期清除或刪除已經消費完文件,減小磁盤佔用。
經過索引信息能夠快速定位message和肯定response的最大大小。
經過index元數據所有映射到memory,能夠避免segment file的IO磁盤操做。
經過索引文件稀疏存儲,能夠大幅下降index文件元數據佔用空間大小。
topic------->多個partiton----------->1個partion多個segment----------->1個segment多個index和log
在只有一個broker的時候,多個partion位於這個broker,有多個broker的時候是按照必定的算法分佈在多個broker上。
說到分區,就要說kafka對消息的存儲.在官方文檔中.
分區讀寫日誌圖
首先,kafka是經過log(日誌)來記錄消息發佈的.每當產生一個消息,kafka會記錄到本地的log文件中,這個log和咱們平時的log有必定的區別.這裏能夠參考一下The Log,很少解釋.
這個log文件默認的位置在config/server.properties中指定的.默認的位置是log.dirs=/tmp/kafka-logs,linux不用說,windows的話就在你對應磁盤的根目錄下.我這裏是D盤.
分區partition
kafka是爲分佈式環境設計的,所以若是日誌文件,其實也能夠理解成消息數據庫,放在同一個地方,那麼必然會帶來可用性的降低,一掛全掛,若是全量拷貝到全部的機器上,那麼數據又存在過多的冗餘,並且因爲每臺機器的磁盤大小是有限的,因此即便有再多的機器,可處理的消息仍是被磁盤所限制,沒法超越當前磁盤大小.所以有了partition的概念.
kafka對消息進行必定的計算,經過hash來進行分區.這樣,就把一份log文件分紅了多份.如上面的分區讀寫日誌圖,分紅多份之後,在單臺broker上,好比快速上手中,若是新建topic的時候,咱們選擇了--replication-factor 1 --partitions 2,那麼在log目錄裏,咱們會看到
test-0目錄和test-1目錄.就是兩個分區了.
你可能會想,這特麼沒啥區別呀.注意,當有了多個broker以後,這個意義就存在了.這裏上一張圖,原文在參考連接裏有
這是一個topic包含4個Partition,2 Replication(拷貝),也就是說所有的消息被放在了4個分區存儲,爲了高可用,將4個分區作了2份冗餘,而後根據分配算法.將總共8份數據,分配到broker集羣上.
結果就是每一個broker上存儲的數據比全量數據要少,但每份數據都有冗餘,這樣,一旦一臺機器宕機,並不影響使用.好比圖中的Broker1,宕機了.那麼剩下的三臺broker依然保留了全量的分區數據.因此還能使用,若是再宕機一臺,那麼數據不完整了.固然你能夠設置更多的冗餘,好比設置了冗餘是4,那麼每臺機器就有了0123完整的數據,宕機幾臺都行.須要在存儲佔用和高可用之間作衡量.
至於宕機後,zookeeper會選出新的partition leader.來提供服務.這個等下篇文章
偏移offset
上一段說了分區,分區就是一個有序的,不可變的消息隊列.新來的commit log持續日後面加數據.這些消息被分配了一個下標(或者偏移),就是offset,用來定位這一條消息.
消費者消費到了哪條消息,是保持在消費者這一端的.消息者也能夠控制,消費者能夠在本地保存最後消息的offset,並間歇性的向zookeeper註冊offset.也能夠重置offset
如何經過offset算出分區
其實partition存儲的時候,又分紅了多個segment(段),而後經過一個index,索引,來標識第幾段.這裏先能夠去看一下本地log目錄的分區文件夾.
在我這裏,test-0,這個分區裏面,會有一個index文件和一個log文件,
index和log
對於某個指定的分區,假設每5個消息,做爲一個段大小,當產生了10條消息的狀況想,目前有會獲得(只是解釋)
0.index (表示這裏index是對0-4作的索引)
5.index (表示這裏index是對5-9作的索引)
10.index (表示這裏index是對10-15作的索引,目前還沒滿)
和
0.log
5.log
10.log
,當消費者須要讀取offset=8的時候,首先kafka對index文件列表進行二分查找,能夠算出.應該是在5.index對應的log文件中,而後對對應的5.log文件,進行順序查找,5->6->7->8,直到順序找到8就行了.
以上是Kafka文件存儲機制及partition和offset的所有內容,在雲棲社區的博客、問答、雲棲號、人物、課程等欄目也有Kafka文件存儲機制及partition和offset的相關內容,歡迎繼續使用右上角搜索按鈕進行搜索存儲 , 文件 , 數據 , 索引 , 磁盤 物理 kafka offset保存機制、kafka offset 存儲、kafka partition、kafka partition 設置、kafka partition 數量,以便於您獲取更多的相關知識。
副本機制
Kafka 的副本機制是多個服務端節點對其餘節點的主題分區的日誌進行復制。
當集羣中的某個節點出現故障,訪問故障節點的請求會被轉移到其餘正常節點(這一過程一般叫 Reblance)。
Kafka 每一個主題的每一個分區都有一個主副本以及 0 個或者多個副本,副本保持和主副本的數據同步,當主副本出故障時就會被替代。
在 Kafka 中並非全部的副本都能被拿來替代主副本,因此在 Kafka 的 Leader 節點中維護着一個 ISR(In Sync Replicas)集合。
翻譯過來也叫正在同步中集合,在這個集合中的須要知足兩個條件:
另外還有個 AR(Assigned Replicas)用來標識副本的全集,OSR 用來表示因爲落後被剔除的副本集合。
因此公式以下:ISR = Leader + 沒有落後太多的副本;AR = OSR+ ISR。
這裏先要說下兩個名詞:HW(高水位)是 Consumer 可以看到的此 Partition 的位置,LEO 是每一個 Partition 的 Log ***一條 Message 的位置。
HW 能保證 Leader 所在的 Broker 失效,該消息仍然能夠重新選舉的 Leader 中獲取,不會形成消息丟失。
當 Producer 向 Leader 發送數據時,能夠經過 request.required.acks 參數來設置數據可靠性的級別:
可是這樣也不能保證數據不丟失,好比當 ISR 中只有 Leader 時(其餘節點都和 ZK 斷開鏈接,或者都沒追上),這樣就變成了 acks = 1 的狀況。
高可用模型及冪等
在分佈式系統中通常有三種處理語義:
at-least-once
至少一次,有可能會有屢次。若是 Producer 收到來自 Ack 的確認,則表示該消息已經寫入到 Kafka 了,此時恰好是一次,也就是咱們後面的 Exactly-once。
可是若是 Producer 超時或收到錯誤,而且 request.required.acks 配置的不是 -1,則會重試發送消息,客戶端會認爲該消息未寫入 Kafka。
若是 Broker 在發送 Ack 以前失敗,但在消息成功寫入 Kafka 以後,這一次重試將會致使咱們的消息會被寫入兩次。
因此消息就不止一次地傳遞給最終 Consumer,若是 Consumer 處理邏輯沒有保證冪等的話就會獲得不正確的結果。
在這種語義中會出現亂序,也就是當***次 Ack 失敗準備重試的時候,可是第二消息已經發送過去了,這個時候會出現單分區中亂序的現象。
咱們須要設置 Prouducer 的參數 max.in.flight.requests.per.connection,flight.requests 是 Producer 端用來保存發送請求且沒有響應的隊列,保證 Produce r端未響應的請求個數爲 1。
at-most-once
若是在 Ack 超時或返回錯誤時 Producer 不重試,也就是咱們講 request.required.acks = -1,則該消息可能最終沒有寫入 Kafka,因此 Consumer 不會接收消息。
exactly-once
恰好一次,即便 Producer 重試發送消息,消息也會保證最多一次地傳遞給 Consumer。該語義是最理想的,也是最難實現的。
在 0.10 以前並不能保證 exactly-once,須要使用 Consumer 自帶的冪等性保證。0.11.0 使用事務保證了。
如何實現 exactly-once
要實現 exactly-once 在 Kafka 0.11.0 中有兩個官方策略:
單 Producer 單 Topic
每一個 Producer 在初始化的時候都會被分配一個惟一的 PID,對於每一個惟一的 PID,Producer 向指定的 Topic 中某個特定的 Partition 發送的消息都會攜帶一個從 0 單調遞增的 Sequence Number。
在咱們的 Broker 端也會維護一個維度爲,每次提交一次消息的時候都會對齊進行校驗:
上面所說的解決了兩個問題:
上面所說的都是在同一個 PID 下面,意味着必須保證在單個 Producer 中的同一個 Seesion 內,若是 Producer 掛了,被分配了新的 PID,這樣就沒法保證了,因此 Kafka 中又有事務機制去保證。
事務
在 Kafka 中事務的做用是:
事務能夠保證就算跨多個,在本次事務中的對消費隊列的操做都當成原子性,要麼所有成功,要麼所有失敗。
而且,有狀態的應用也能夠保證重啓後從斷點處繼續處理,也即事務恢復。
在 Kafka 的事務中,應用程序必須提供一個惟一的事務 ID,即 Transaction ID,而且宕機重啓以後,也不會發生改變。
Transactin ID 與 PID 可能一一對應,區別在於 Transaction ID 由用戶提供,而 PID 是內部的實現對用戶透明。
爲了 Producer 重啓以後,舊的 Producer 具備相同的 Transaction ID 失效,每次 Producer 經過 Transaction ID 拿到 PID 的同時,還會獲取一個單調遞增的 Epoch。
因爲舊的 Producer 的 Epoch 比新 Producer 的 Epoch 小,Kafka 能夠很容易識別出該 Producer 是老的,Producer 並拒絕其請求。
爲了實現這一點,Kafka 0.11.0.0 引入了一個服務器端的模塊,名爲 Transaction Coordinator,用於管理 Producer 發送的消息的事務性。
該 Transaction Coordinator 維護 Transaction Log,該 Log 存於一個內部的 Topic 內。
因爲 Topic 數據具備持久性,所以事務的狀態也具備持久性。Producer 並不直接讀寫 Transaction Log,它與 Transaction Coordinator 通訊,而後由 Transaction Coordinator 將該事務的狀態插入相應的 Transaction Log。
Transaction Log 的設計與 Offset Log 用於保存 Consumer 的 Offset 相似。
***
關於消息隊列或者 Kafka 的一些常見的面試題,經過上面的文章能夠提煉出如下幾個比較經典的問題,大部分問題均可以從上面總結後找到答案:
A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
如上圖,一個Topic有四個Partition,每一個Partition兩個replication。
Zookeeper在Kakfa中扮演的角色Kafka將元數據信息保存在Zookeeper中,可是發送給Topic自己的數據是不會發到Zk上的,不然Zk就瘋了。
[zk: localhost:2181(CONNECTED) 0] ls / [admin, consumers, config, brokers]
參考:kafka工做原理