Kafka消息生成,消費,存儲機制

 Kafka是最初由Linkedin公司開發,是一個分佈式、分區的、多副本的、多訂閱者,基於zookeeper協調的分佈式日誌系統(也能夠當作MQ系統),常見能夠用於web/nginx日誌、訪問日誌,消息服務等等,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。nginx

       今天我會從幾個重要的環節去介紹Kafka的一些基本特性。Kafka是分佈式的,因此內容消息一般是分佈在各個機器上,通常消息會發送到topic中,一個topic一般由多個partition,kafka把每一個topic的每一個partition均勻的分佈在集羣中的不一樣服務器上.因此從總體來看,Kafka的邏輯關係就是:生產者向topic中的某個partition發送消息,消費者從partition獲取消息。程序員

Kafka基本概念

  • Broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker能夠組成一個Kafka集羣。
  • Topic:一類消息,例如page view日誌、click日誌等均可以以topic的形式存在,Kafka集羣可以同時負責多個topic的分發。
  • Partition:topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。
  • Segment:partition物理上由多個segment組成。
  • offset:每一個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每一個消息都有一個連續的序列號叫作offset,用於partition惟一標識一條消息。

Kafka消息發送的機制

      每當用戶往某個Topic發送數據時,數據會被hash到不一樣的partition,這些partition位於不一樣的集羣節點上,因此每一個消息都會被記錄一個offset消息號,隨着消息的增長逐漸增長,這個offset也會遞增,同時,每一個消息會有一個編號,就是offset號。消費者經過這個offset號去查詢讀取這個消息。web

發送消息流程算法

  首先獲取topic的全部Patition數據庫

  若是客戶端不指定Patition,也沒有指定Key的話,使用自增加的數字取餘數的方式實現指定的Partition。這樣Kafka將平均的向Partition中生產數據。緩存

  若是想要控制發送的partition,則有兩種方式,一種是指定partition,另外一種就是根據Key本身寫算法。繼承Partitioner接口,實現其partition方法。服務器

 

Kafka消息消費機制 

        kafka 消費者有消費者族羣的概念,當生產者將數據發佈到topic時,消費者經過pull的方式,按期從服務器拉取數據,固然在pull數據的時候,,服務器會告訴consumer可消費的消息offset。網絡

      建立一個Topic (名爲topic1),再建立一個屬於group1的Consumer實例,並建立三個屬於group2的Consumer實例,而後經過 Producer向topic1發送Key分別爲1,2,3的消息。結果發現屬於group1的Consumer收到了全部的這三條消息,同時 group2中的3個Consumer分別收到了Key爲1,2,3的消息,以下圖所示。數據結構

        

           結論:不一樣 Consumer Group下的消費者能夠消費partition中相同的消息,相同的Consumer  Group下的消費者只能消費partition中不一樣的數據。app

                    topic的partition的個數和同一個消費組的消費者個數最好一致,若是消費者個數多於partition個數,則會存在有的消費者消費不到數據。

           服務器會記錄每一個consumer的在每一個topic的每一個partition下的消費的offset,而後每次去消費去拉取數據時,都會從上次記錄的位置開始拉取數據。好比0.8版本的用zookeeper來記錄    

         /{comsumer}/{group_name}/{id}/{consumer_id}  //記錄id

         /{comsumer}/{group_name}/{offset}/}{topic_name}/{partitions_id}  //記錄偏移量

        /{comsumer}/{group_name}/{owner}/}{topic_name}/{partitions_id}  //記錄分區屬於哪一個消費者

  當consumer和partition增長或者刪除時,須要從新執行一遍Consumer Rebalance算法

 

 Consumer Rebalance的算法以下

  • 將目標Topic下的全部Partirtion排序,存於PT
  • 對某Consumer Group下全部Consumer排序,存於CG,第i個Consumer記爲Ci
  • N=size(PT)/size(CG),向上取整
  • 解除Ci對原來分配的Partition的消費權(i從0開始)
  • 將第i∗N到(i+1)∗N−1個Partition分配給Ci

Kafka消息存儲機制 

      kafka的消息是存儲在磁盤的,因此數據不易丟失, 如上了解,partition是存放消息的基本單位,那麼它是如何存儲在文件當中的呢,如上:topic-partition-id,每一個partition都會保存成一個文件,這個文件又包含兩部分。 .index索引文件、.log消息內容文件。

index文件結構很簡單,每一行都是一個key,value對
key 是消息的序號offset,value 是消息的物理位置偏移量.   index索引文件 (offset消息編號-消息在對應文件中的偏移量)

  

好比:要查找.index文件中offseet 爲7的 Message(全局消息爲id 368776):

  1. 首先是用二分查找肯定它是在哪一個LogSegment中,天然是在第一個Segment中。
  2. 打開這個Segment的index文件,也是用二分查找找到offset小於或者等於指定offset的索引條目中最大的那個offset。天然offset爲6的那個索引是咱們要找的,經過索引文件咱們知道offset爲6的Message在數據文件中的位置爲1407
  3. 打開數據文件.log,從位置爲1407的那個地方開始順序掃描直到找到 .index文件中offseet 爲7(全局消息爲id 368776)的那條Message(offset 1508)。

  這套機制是創建在offset是有序的。索引文件被映射到內存中,因此查找的速度仍是很快的。

 這是一種稀疏索引文件機制,並無把每一個消息編號和文件偏移量記錄下來,而是稀疏記錄一部分,這樣能夠方式索引文件佔據過多空間。每次查找消息時,須要將整塊消息讀入內存,而後獲取對應的消息。

 好比消息offset編號在36,37,38的消息,都會經過38找到對應的offset

 

Kafka數據存儲格式

從上述瞭解到.log由許多message組成,下面詳細說明message物理結構以下:

參數說明:

關鍵字             解釋說明
8 byte offset 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱爲偏移(offset),它能夠惟一肯定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message
4 byte    message size                                                            message大小
4 byte CRC32 用crc32校驗message
1 byte 「magic」 表示本次發佈Kafka服務程序協議版本號
1 byte 「attributes」 表示爲獨立版本、或標識壓縮類型、或編碼類型。
4 byte key length 表示key的長度,當key爲-1時,K byte key字段不填
K byte key 可選
value bytes payload 表示實際消息數據。

 

 

日誌更新和清理

        Kafka中若是消息有key,相同key的消息在不一樣時刻有不一樣的值,則只容許存在最新的一條消息,這就比如傳統數據庫的update操做,查詢結果必定是最近update的那一條,而不該該查詢出多條或者查詢出舊的記錄,固然對於HBase/Cassandra這種支持多版本的數據庫而言,update操做可能致使添加新的列,查詢時是合併的結果而不必定就是最新的記錄。圖3-27中示例了多條消息,一旦key已經存在,相同key的舊的消息會被刪除,新的被保留。以下圖,就是對日誌更新而後壓縮。

 清理後Log Head部分每條消息的offset都是逐漸遞增的,而Tail部分消息的offset是斷斷續續的。 LogToClean 表示須要被清理的日誌

       生產者客戶端若是發送的消息key的value是空的,表示要刪除這條消息, 發生在刪除標記以前的記錄都須要刪除掉,而發生在刪除標記(Cleaner Point)以後的記錄則不會被刪除。

消息檢索過程示例

例如讀取offset=368的消息
(1)找到第368條消息在哪一個segment
從partition目錄中取得全部segment文件的名稱,就至關於獲得了各個序號區間

 例若有3個segment
           00000000000000000000.index
           00000000000000000000.log
           00000000000000000300.index
           00000000000000000300.log
           00000000000000000600.index
           00000000000000000600.log
    根據二分查找,能夠快速定位,第368條消息是在00000000000000000300.log文件中

(2)00000000000000000300.index文件中找到其物理偏移量
     讀取 00000000000000000300.index 。以 68 (368-300的值)爲key,獲得value,如299,就是消息的物理位置偏移量

(3)到log文件中讀取消息內容
    讀取 00000000000000000300.log  從偏移量299開始讀取消息內容。完成了消息的檢索過程

Kafka日誌磁盤存儲優於內存

其實Kafka最核心的思想是使用磁盤,而不是使用內存,可能全部人都會認爲,內存的速度必定比磁盤快,我也不例外。在看了Kafka的設計思想,查閱了相應資料再加上本身的測試後,發現磁盤的順序讀寫速度和內存持平。

並且Linux對於磁盤的讀寫優化也比較多,包括read-ahead和write-behind,磁盤緩存等。若是在內存作這些操做的時候,一個是JAVA對象的內存開銷很大,另外一個是隨着堆內存數據的增多,JAVA的GC時間會變得很長,使用磁盤操做有如下幾個好處:

  • 磁盤緩存由Linux系統維護,減小了程序員的很多工做。
  • 磁盤順序讀寫速度超過內存隨機讀寫。
  • JVM的GC效率低,內存佔用大。使用磁盤能夠避免這一問題。
  • 系統冷啓動後,磁盤緩存依然可用。

另外可參考知乎的一篇文章:如何利用磁盤順序讀寫快於內存隨機讀寫這一現象?https://www.zhihu.com/question/48794778

 

Kafka 性能設計

     一個topic就是個table,table會動態增加,並且只是追加,在集羣中有不少table,訪問時訪問table中的數據,有個巨大的優點是,只會在最新的基礎上追加數據,因此不會有衝突,不須要加鎖。徹底可使用磁盤的順序讀寫,比隨機讀寫快10000倍。

       Kafka中用到了sendfile機制,隨機讀寫是每秒k級別的,若是是線性讀寫可能能到每秒上G,kafka在實現時,速度很是快,是由於會把數據當即寫入文件系統的持久化日誌中,不是先寫在緩存中,再flush到磁盤中。也就是說,數據過來的時候,是傳輸在os kernel的頁面緩存中,由os刷新到磁盤中。在os採用sendfile的機制,os能夠從頁面緩存一步發送數據到網絡中,同時,kafka支持gzip和Snappy對數據進行壓縮,這個對傳輸數據相當重要。

        數據存儲採用topic-partition-record的三層體系,是個樹狀數據結構。對於樹的存儲,比較經常使用的是B tree,運行時間是O(logN),可是在由於須要鎖定機制,在磁盤層面,在高速交換、數據規模比較大的時候,性能損耗仍是比較厲害的。Kafka的方式是把全部消息當作普通的日誌,理念就是把日誌內容簡單的追加,採用offset讀取數據,優點是性能徹底是線性的,和數據大小沒有關係,同時,讀取操做和寫入操做不會互相阻塞,性能能永遠達到最大化。

相關文章
相關標籤/搜索