2018年第52周-Kafka知識點

預祝2019年元旦節快樂!
2018年最後一週,分享些Kafka的知識點。

Kafka數據分區和消費者的關係

分區(partition)

topic是邏輯概念,分區(partition)是物理概念,對於用戶來講是透明的。producer只須要關心消息網哪一個topic發送,而consumer之關係本身訂閱哪一個topic,不須要關心每條消息存於整個集羣的哪一個Broker。 html

爲了性能考慮,若是topic的消息都放在一個Broker,這個Broker必然稱爲瓶頸,並且沒法作到水平擴展。因此topic內的數據分佈到整個集羣就是個天然而然的設計了。分區的引入就是解決水平擴展問題的一個解決方案。 node

Kafka儘可能將全部的分區均勻分配到整個集羣上。基本算法以下:算法

  1. 將全部存貨的N個Broker和待分配的分區排序。
  2. 將第i個分區分配到第(i mon n)個Broker上,這個分區的第一個副本(Replica)存在於這個分配的Broker上,而且會做爲分區的優先副本。
  3. 將第i個分區的第j個副本分配到第(i+j)mod n 個Broker上。

但實際狀況Kafka的算法是上述基礎上再加些,看Kafka的函數assignReplicasToBrokers。變了兩點:apache

  1. 限制了副本因子(replication factor)不得大於Broker的個數。由於當j大於n時,就會存在一個Broker有兩個副本,這沒意義且浪費。
  2. 起始位置不是第0個Broker,是第rand.nextInt(Brokers.size)個。那是由於(i mon n)形成的問題就是,永遠都會有0,未必有n。因此必須加上隨機數,也就是第i個分區分配到第(i mon n)+rand.nextInt(Brokers.size)個Broker上。
  3. 若是考慮多機架問題,那麼Broker順序就未必是0,1,2,3,4,5。而是若是0,1,2是機架A,3,4,5是機架B。則Broker的順序爲0,3,1,4,2,5。錯開,每一個機架依次選一次。因此當副本因子爲3時,保證每個分區在各個機架都至少有一個副本。

分區中副本通常都是 Leader,其他的都是 Follow 副本。生產者消費者都固定在 Leader進行生產和消費。api

分區與生產者

 負載均衡(Load balancing)  

生產者直接發送數據到Broker,不須要任何的中間路由層,而接受的Broker是該分區的Leader。
爲了幫助生產者實現這一點,全部Broker均可以回答關於哪些是可用服務器的元數據的請求,以及在任何給定的時間內,某個主題的分區的Leader是否容許生產者適當地發送它的請求。
客戶端能夠控制往哪一個分區生產消息。這能夠隨機地進行,實現一種隨機的負載平衡,或者能夠經過一些語義分區函數來實現負載平衡
Kafka提供了語義分區的接口,容許用戶指定一個分區的key,並使用這個key來作hash到一個分區(若是須要的話,也是能夠複寫這分區功能的)。例如,咱們選擇user的id做爲可用,則因此該用戶的信息都會發送到一樣的分區。數組

異步發送(Asynchronus send)

批處理是效率的主要驅動因素之一,爲了可以批處理,Kafka的生產者會嘗試在內存中積累數據,而後在一塊兒在一個請求中以大批量的形式發送出去。批處理這個能夠設置按固定的消息數量或按特定的延遲(64k或10ms)。這容許累積更多字節的發送出去,這樣只是在服務器上作少許的大IO操做。這種緩衝是可配置的,這樣提供了一種機制來以額外的延遲來提升吞吐量。具體的配置)和生產者的api能夠在這文檔中找到。緩存

分區與消費者

消費者的工做方式是,向分區的Leader發送「fetch」請求。在每一個請求中消費者指定日誌的偏移量(position),而後接受回一大塊從偏移量開始的日誌。所以,消費者對偏移量有重要的控制權,若是須要,能夠重置偏移量來從新消費數據。bash

Push和pull

咱們首先考慮的一個問題是,消費者應該是從Broker拉取消息,仍是應該是Broker把消息推送給消費者。在這方面,Kafka遵循了一種更傳統的設計,大多數消息隊列系統也會用的,那就是數據是從生產者push到Broker,消費者是從Broker拉取數據。一些日誌集中系統,如Scribe和Apache Flume,遵循一個很是不一樣的,基於推送的路徑,將數據被推到下游。這兩種方法都由利弊,在基於推送的系統,因爲是Broker得控制數據傳輸的速率,不一樣消費者可能要不一樣的速率。然而消費者通常的目的都是讓消費者本身可以以最大的速度進行消費,但在基於push的系統,當消費速率低於生產效率時,消費者就不知道該怎麼辦好了(本質上就是一種拒絕服務攻擊(DOS))。一個基於pull的系統就擁有很好的熟悉,消費者能夠簡單的調控速率。服務器

基於pull的系統的另外一個優勢是,它能夠對發送給消費者的數據進行聚合的批處理。基於推送的系統必須選擇當即發送請求或積累更多數據,而後在不知道下游用戶是否可以當即處理它的狀況下發送它。網絡

基於pull的系統的缺點是,若是Broker沒數據,則消費者可能會不停的輪訓。爲了不這一點,咱們在pull請求上提供了參數,容許消費者在「長輪訓」中阻塞,直到數據達到(而且能夠選擇等待,直到必定數量的本身能夠,確保傳輸的大小)。

消費者的Position(Consumer Position)

使人驚訝的是,跟蹤消息是否使用了,是消息隊列系統的關鍵性能點之一。
不少消息隊列系統在Broker中保存了關於什麼消息是被消費了的元數據。也就是說,當消息隊列給消費者時,Broker要麼當即記錄信息到本地,要麼就是等待消費者的確認。這是一個至關直觀的選擇,並且對於一臺機器服務器來講,很清楚地知道這些消息的狀態。因爲許多消息隊列系統中用於存儲的數據結構都很糟糕,所以這(記錄消息狀態)也是一個實用的選擇——由於Broker知道什麼是已經被消費的,因此能夠當即刪除它,保持數據的大小。

讓Broker和消費者就已經消費的東西達成一致,這可不是小問題。若是一條消息發送到網絡上,Broker就把它置爲已消費,但消費者可能處理這條消息失敗了(或許是消費者掛了,也或許是請求超時等),這條消息就會丟失了。爲了解決這個問題,不少消息隊列系統增長了確認機制。當消息被髮送時,是被標誌爲已發送,而不是已消費;這是Broker等待消費者發來特定的確認信息,則將消息置爲已消費。這個策略雖然解決了消息丟失的問題,但卻帶來了新的問題。第一,若是消費者在發送確認信息以前,在處理完消息以後,消費者掛了,則會致使此消息會被處理兩次。第二個問題是關於性能,Broker必須保存每一個消息的不一樣狀態(首先先鎖住消息以至於不會讓它發送第二次,其次標誌位已消費從而能夠刪除它)。還有些棘手的問題要處理。如消息被髮送出去,但其確認信息一直沒返回。

Kafka處理則不同。咱們的主題被分爲一個有序分區的集合,且每一個分區在任何給定的時間內只會被訂閱它的消費者組中的一個消費者給使用。這意味着每一個分區中的消費者的position僅僅是一個整數,這是下一次消費時,消息的偏移量。這使狀態(記錄是否被消費)很是小,每一個分區只有一個數字。這個狀態能夠被按期檢查。這樣確認一條消息是否被消費的成本就很低。

這樣還附加了一個好處。消費者能夠重置其最早的position從而從新消費數據。這雖然違反了隊列的公共契約,但它卻變成關鍵功能給許多消費者。例如,若是消費者代碼有一個bug,而且在一些消息被消費後才被發現,那麼當bug被修復後,消費者就能夠從新使用這些消息。

消費組

每羣消費者都會被標誌有消費組名。有消費組這個概念,Kafka就能夠實現相似與工做隊列(Worke Queues)模式和發佈/訂閱(Publish/Subscribe)。
若是消費者都在同一個消費組,則消息則會負載均衡的分配每一個消費者,一條消息不會分配個兩個及以上的消費者。
若是消費者不在同一個組,則消息會被廣播到每個消費組中。

Kafka的數據offset讀取流程

每一個消息在分區中都是被分配一個有序的ID數字,而這數字,咱們稱之爲偏移量(offset)。在一個分區上,offset惟一標識一個消息。
由每一個消費者維護offset。

在Kafka文件存儲中,同一個topic下有多個不一樣分區,每一個分區爲一個目錄,分區命名規則爲topic名稱+有序序號,第一個分區序號從0開始,序號最大值爲分區數量減1。

partition物理上由多個大小相等的segment組成。segment由2大部分組成,分別爲index file和data file,此2個文件一一對應,成對出現,後綴".index"和「.log」分別表示爲segment索引文件、數據文件.
segment文件命名規則:partion全局的第一個segment從0開始,後續每一個segment文件名爲上一個segment文件最後一條消息的offset值。數值最大爲64位long大小,19位數字字符長度,沒有數字用0填充。

00000000000000000.index
00000000000000000.log
00000000000368769.index
00000000000368769.log
00000000000737337.index
00000000000737337.log
00000000001105814.index
00000000001105814.log

index file的結構:

1,0
3,497
6,1407
8,1686
....
N,position

index file結構是兩個數字兩個數字一組,N,position。N用於查找相對於當前文件名的offset值的N個消息。如00000000000368769.index的3,497,則爲368769+3=第368772個消息。而position 497是指data file的偏移量497。

data file由許多message組成,message物理結構以下:

8 byte offset
4 byte message size
4 byte CRC32
1 byte "magic"
1 byte "attributes"
4 byte key length
K byte key
4 byte payload length
value bytes payload

這樣的結構,配合index file,很快就能夠知道某條消息的大小。

 在partition中如何經過offset查找message

例如讀取offset=368776的message,須要經過下面2個步驟查找。

第一步查找segment file

上述爲例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)爲0.第二個文件00000000000000368769.index的消息量起始偏移量爲368770 = 368769 + 1.一樣,第三個文件00000000000000737337.index的起始偏移量爲737338=737337 + 1,其餘後續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset 二分查找文件列表,就能夠快速定位到具體文件。
當offset=368776時定位到00000000000000368769.index|log

第二步經過segment file查找message

經過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和00000000000000368769.log的物理偏移地址,而後再經過00000000000000368769.log順序查找直到offset=368776爲止。

從上述圖3可知這樣作的優勢,segment index file採起稀疏索引存儲方式,它減小索引文件大小,經過mmap能夠直接內存操做,稀疏索引爲數據文件的每一個對應message設置一個元數據指針,它比稠密索引節省了更多的存儲空間,但查找起來須要消耗更多的時間。

Kafka高效文件存儲設計特色

  • Kafka把topic中一個parition大文件分紅多個小文件段,經過多個小文件段,就容易按期清除或刪除已經消費完文件,減小磁盤佔用。
  • 經過索引信息能夠快速定位message和肯定response的最大大小。
  • 經過index元數據所有映射到memory,能夠避免segment file的IO磁盤操做。
  • 經過索引文件稀疏存儲,能夠大幅下降index文件元數據佔用空間大小。

注: 稀疏索引相似於帶一級索引的跳錶,可是一級索引是數組可使用二分法查找。

注:mmap()函數是Linux的文件空間映射函數,用來將文件或設備空間映射到內存中,能夠經過映射後的內存空間存取來得到與存取文件一致的控制方式,沒必要再使用read(),write()函數。
mmap和常規文件操做的區別
回顧一下常規文件系統操做(調用read/fread等類函數)中,函數的調用過程:

  1. 進程發起讀文件請求。
  2. 內核經過查找進程文件符表,定位到內核已打開文件集上的文件信息,從而找到此文件的inode。
  3. inode在address_space上查找要請求的文件頁是否已經緩存在頁緩存中。若是存在,則直接返回這片文件頁的內容。
  4. 若是不存在,則經過inode定位到文件磁盤地址,將數據從磁盤複製到頁緩存。以後再次發起讀頁面過程,進而將頁緩存中的數據發給用戶進程。

總結來講,常規文件操做爲了提升讀寫效率和保護磁盤,使用了頁緩存機制。這樣形成讀文件時須要先將文件頁從磁盤拷貝到頁緩存中,因爲頁緩存處在內核空間,不能被用戶進程直接尋址,因此還須要將頁緩存中數據頁再次拷貝到內存對應的用戶空間中。這樣,經過了兩次數據拷貝過程,才能完成進程對文件內容的獲取任務。寫操做也是同樣,待寫入的buffer在內核空間不能直接訪問,必需要先拷貝至內核空間對應的主存,再寫回磁盤中(延遲寫回),也是須要兩次數據拷貝。
而使用mmap操做文件中,建立新的虛擬內存區域和創建文件磁盤地址和虛擬內存區域映射這兩步,沒有任何文件拷貝操做。而以後訪問數據時發現內存中並沒有數據而發起的缺頁異常過程,能夠經過已經創建好的映射關係,只使用一次數據拷貝,就從磁盤中將數據傳入內存的用戶空間中,供進程使用。
總而言之,常規文件操做須要從磁盤到頁緩存再到用戶主存的兩次數據拷貝。而mmap操控文件,只須要從磁盤到用戶主存的一次數據拷貝過程。說白了,mmap的關鍵點是實現了用戶空間和內核空間的數據直接交互而省去了空間不一樣數據不通的繁瑣過程。所以mmap效率更高。

函數原型

void *mmap(void *start, size_t length, int prot, int flags, int fd, off_t offset);

也就是能夠將大數據的文件,局部映射到內存中,在內存中進行此部分文件的操做。對此內存操做,都不涉及到內核空間到用戶空間之間交互。直接操做內存,內存直接寫入(讀取)文件。就只有一次IO。 若是是普通文件操做,則須要文件複製到內核,再由內核複製到用戶空間,用戶空間才能操做。從而達到零拷貝
換句話說,但凡是須要用磁盤空間代替內存的時候,mmap均可以發揮其功效。

Kafka內部如何保證順序,結合外部組建如何保證消費者的順序

Kafka中每一個分區都是有序,因爲Kafka的消息是不可變的,因此都是追加的形式有序的往上加消息。這個結構體叫 結構化提交日誌(a structured commit log)。

首先就要考慮是否真的須要全部消息在隊列都得有序。通常狀況,不止通常,而是很大一部分,是能夠無序的。就跟分佈式同樣。有不少業務,看起來是同步的,靜下來慢慢思考,就會發現不少東西是能夠異步執行的。
若是實在有這樣保證順序的須要,保證生產者需將有序地提交給一個分區,首先是生產者不能提交錯順序。其次,消費者組就不能擁有兩個或以上消費者實例了。連兩個或以上的消費者組也不能有。

持久化

Kafka會根據保留時間這參數,持久化全部已經收到的消息。雖然能夠設置保留時間這參數,可是Kafka優秀的性能,添加刪除都是常量級的性能,因此理論上,數據保存很長時間也不成問題。

參考:
http://kafka.apache.org/
https://www.zhihu.com/questio...
https://blog.csdn.net/yangyut...
http://www.cnblogs.com/huxiao...
https://www.cnblogs.com/ITtan...

稀疏索引:https://blog.csdn.net/qq_2223...
跳錶:https://www.jianshu.com/p/dc2...

相關文章
相關標籤/搜索