2018年第17周-RabbitMQ的模式與Kafka的設計

理解概念的一個方法

以前說過學習一個新的東西,最核心的就是掌握概念。而如何掌握概念呢?個人其中一個方法就是對比,把類似且模糊不清的兩個概念進行對比,這樣就理解更快。html

RabbitMQ模式

RabbitMQ有如下模式:
1.工做隊列(Worke Queues)
發消息和收消息都是直接經過隊列。在耗時比較多的任務,咱們把任務放入隊列裏,而後每一個工做者去獲取任務而後處理。因此這個工做隊列,也稱爲任務隊列(Task Queues)。這樣就將耗資源的任務從產生任務的應用上解耦出來。
這個模式最主要的特徵是:每一個任務只會分發到一個工做者中。 java

clipboard.png

2.發佈/訂閱(Publish/Subscribe)
這個發佈/訂閱和觀察者模式很像,但不是同一個東西。具體可看看發佈/訂閱和觀察者區別。
在這裏,RabbitMQ引入了交換器(Exchange)的概念,生產者不直接與隊列交互,而是經過交換器去與隊列進行交互(或者叫綁定)。也就說生產者只和交換器交互。引入交換器這概念後,這消息中間件能夠玩的花樣就多了。發佈/訂閱(Publish/Subscribe)就是其中的一個。這裏使用到的就是fanout的交換器。
這個模式最主要的特徵是:相似於廣播(broadcast),同個消息能夠發送到不一樣的隊列中去,並且這fanout交換器也不關係隊列有哪些,只要隊列和fanout交換器有綁定就發送,這樣就能夠將消息重複發送到不一樣的隊列上。
與工做隊列模式的區別是:發佈/訂閱的概念叫消息,而不是任務。因此消息能夠重複的放入不一樣的隊列中。 node

clipboard.png

3.路由(Routing)
路由模式也是引入交換器概念後,消息中間件玩的一個花樣。這裏用到的交換器叫direct。
在這模式裏,得新增兩個概念,分別是binding key和routing key, binding key是對於隊列來講的,在其與direct交換器綁定時指定binding key。而routing key是對於消息來講的,在其發送消息到direct交換器時,需指定routing key。這樣routing key可以和binding key匹配得上的(就是值相等),direct交換器就會將消息發送到對應binding key的隊列上。
這個模式最主要的特徵是:控制消息的精度更高,能夠指定哪些消息發送到哪些隊列裏。
與發佈/訂閱模式的區別是:區別是發佈/訂閱是廣播,將消息發送到任何綁定交換器的隊列上,因此沒能力選擇消息,而路由是需binding key和routing key匹配上,消息才能發送到對應binding key的隊列上,從而有能力去選擇消息。
與發佈/訂閱模式的相同點是:能夠將消息重複發送。
注:隊列能夠綁定多個routing key linux

4.主題(Topics)
固然,主題模式也是引入交換器概念後,消息中間件玩的一個花樣。這裏用到的交換器叫topic。
這裏用到的也是binding key和routing key,但不同的是,routing_key不能指定明確的key。而是這個key須要帶有點「.」,如 "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。而在這模式下,binding key的指定能夠更普遍些,其結構是這樣的".orange." 、 "..rabbit" 和"lazy.#"。其中*(星號)是能夠表明一個單詞,#(井號)是能夠表明零個或多個單詞。也跟路由相似的,只要這樣routing key可以和binding key匹配得上的(這裏能夠不用值相等,模式匹配上便可),topic交換器就會將消息發送到對應binding key的隊列上。面試

如Q1隊列的binding key是" .orange.",而 Q2是" ..rabbit"和"lazy.#"。若是消息的routing key是 "quick.orange.rabbit" 則此消息會被髮送到Q1和Q2隊列上。routing key是"quick.orange.fox"的消息只會發送到Q1隊列上。routing key是"lazy.pink.rabbit" 的消息只會發送到Q2隊列一次,routing key是 "quick.brown.fox" 的消息沒有匹配任何的binding key則此消息丟棄。
注:隊列能夠綁定多個routing key

5.遠程過程調用RPC(Remote Procedure Call)
RPC能夠遠程調用函數,等待服務器返回結果。算法

RPC的一個備註:RPC雖然用得很普遍,然而它也有不足之處,就是開發人員沒法清晰的知道本身調用的這個函數究竟是本地函數仍是很慢的RPC。這種困惑很容易致使出一個不可預測的系統和增長不必的複雜性致使難以定位問題。若是不用簡單的程序,誤用RPC還可能寫出很維護的意大利麪條式的代碼。。
對於這個問題,有三個建議數據庫

  • 保證函數是很容易被辨別出是本地函數仍是遠程函數。
  • 文檔化,清晰地記錄組件間的依賴。
  • 處理網絡帶來的異常,如超時等。

當出現用RPC是否必要時,若是能夠的話,你最好用異步管道(asynchronous pipeline)的形式,而不是使用阻塞形式的RPC。
apache

RabbitMQ能夠用於構建RPC系統。一個客戶端和一個可擴展的RPC服務器。不過此功能不太經常使用,因此就不留篇幅來說解。大概原理就是能夠新增消息的屬性,從而將請求和響應的消息給匹配上。設計模式

觀察者模式和發佈/訂閱模式的區別

觀察者模式
觀察者模式的定義:對象間的一種一對多的組合關係,以便一個對象的狀態發生變化時,全部依賴於它的對象都獲得通知。
舉個例子api

假設你正在找一份軟件工程師的工做,對「香蕉公司」很感興趣。因此你聯繫了他們的HR,給了他你的聯繫電話。他保證若是有任何職位空缺都會通知你。這裏還有幾個候選人也你同樣很感興趣。因此職位空缺你們都會知道,若是你迴應了他們的通知,他們就會聯繫你面試。
該模式必須包含兩個角色:觀察者和觀察對象,香蕉公司就是被觀察者Subject,你就是Observers(還有和你同樣的候選人),當被觀察者狀態發送變化(好比職位空缺)就會通知(notify)觀察者,前提是Observers註冊到Subject裏,也就是香蕉公司的HR得有你的電話號碼。

發佈/訂閱模式
在觀察者模式中的Subject就像一個發佈者(Publisher),而觀察者(Observer)徹底能夠看做一個訂閱者(Subscriber)。subject通知觀察者時,就像一個發佈者通知他的訂閱者。這也就是爲何不少書和文章使用「發佈-訂閱」概念來解釋觀察者設計模式。可是這裏還有另一個流行的模式叫作發佈-訂閱設計模式。它的概念和觀察者模式很是相似。最大的區別是:
在發佈-訂閱模式,消息的發送方,叫作發佈者(publishers),消息不會直接發送給特定的接收者(訂閱者)。
意思就是發佈者和訂閱者不知道對方的存在。須要一個第三方組件,叫作消息中間件,它將訂閱者和發佈者串聯起來,它過濾和分配全部輸入的消息。換句話說,發佈/訂閱模式用來處理不一樣系統組件的信息交流,即便這些組件不知道對方的存在。

Kafka設計(DESIGN)

動機

咱們設計kafka,是但願它能成爲統一的平臺來處理大公司可能擁有的全部實時數據流。要作到這一點,咱們必須考慮至關廣的用例(use case)。

  • 它須要擁有高吞吐量來支持大容量事件流,如實時日誌聚合(real-time log aggregation)。
  • 它須要優雅地處理大量的數據備份,用於支持離線系統的週期性數據負載。
  • 它須要處理低延遲的傳遞,用於支持傳統的消息傳遞系統用例。

咱們想它是分區、分佈式、實時處理信息流,以建立新的信息流和傳輸信息流。這些動機造就了kafka的分區和消費者模型。
最後有可能數據流被輸入到其餘數據系統中,而這些系統須要對外提供服務,因此kafka須要有能力保證容錯性,哪怕存在有機器宕機。

爲了支持上述這些,咱們設計了一些獨特元素,更相似於數據庫日誌,而不是傳統的消息傳遞系統。

咱們將在下面部分中概述設計中的一些元素。

持久化(Persistence)

別懼怕文件系統

kafka重度依賴文件系統,用文件系統來存儲和緩存消息。人們都由這感受「硬盤很慢」,以至於你們懷疑一個持久化架構是否能具備競爭力的性能。實際上硬盤它很快也很慢,這取決於咱們怎麼去使用它。一個合理的硬盤架構一般能夠和網絡同樣快。(看來做者的網速都很快)。
硬盤性能的關鍵是,磁盤驅動器的吞吐量與過去十年的硬盤搜索的延遲有所不一樣。所以在6×7200rpm SATA RAID-5陣列的JBOD配置上的線性寫的性能大約爲600MB/秒,但隨機寫入的性能僅爲100k/秒,即超過6000倍的差異。這些線性讀寫是全部使用模式中最可預測的,而且由操做系統進行了大量優化。現代操做系統都提供了預讀取(read-ahead)和後寫(write-behind)操做的技術,這些支持屢次讀取到一個大塊中和合並小的邏輯寫造成一個大的物理寫。這問題更深刻的討論能夠在這找到 ACM Queue article,他們確實發現順序硬盤讀寫在某些狀況下比隨機內存訪問還快

爲了彌補這些性能差別,現代操做系統愈來愈着重使用主存來作磁盤緩存。現代操做系統很樂意將空餘內存轉移到磁盤緩存中,但這須要承受在內存被回收時帶來的一點點的性能損失。全部硬盤讀寫都經過這統一的緩存(磁盤緩存)。若是沒有直接IO,這特性並無那麼容易被拋棄。所以即便一個進場維護本身數據緩存時,這些數據將會在OS的頁緩存裏複製兩份,兩次高效地存儲全部東西。

此外,咱們是在JVM基礎上創建的,任何一位有花時間去研究Java內存的使用,都會知道如下兩件事情:
1.對象的內存開銷很是高,一般會使要存儲的數據的大小增大一倍(甚至更多)。
2.隨着堆內存的增長,Java垃圾收集會變得愈來愈繁瑣和緩慢。

也正是使用文件系統和依賴頁緩存(pagecache)帶來的結果優於維護一個內存中的緩存(in-memory cache)或是其餘結構,經過對全部空閒內存進行自動訪問,咱們至少能夠將可用緩存加倍,而且還能夠繼續加倍,經過存儲緊湊的字節結構而不是單個對象。這樣作的話能夠在32GB的機器上使用28-30GB緩存,而不用擔憂GC問題。並且,即便服務重啓,這些數據也保持熱度,對比起來,進程內存中的緩存在重啓後須要重建(對於10GB的緩存可能須要10分鐘),不然它須要從一個徹底冷的緩存開始(這可能意味更糟糕的初始化性能)。這也極大地簡化了代碼,由於在緩存和文件系統之間保持一致性的全部邏輯如今都在操做系統中,這比一次性在進程內嘗試更有效、更正確。若是您的磁盤使用傾向於線性讀取,那麼預讀取將有效地預操做這些緩存。

這代表了一個很是簡單的設計:在咱們耗盡空間的時候,與其保持儘量多的內存並將其所有清空到文件系統,不如反過來,數據都是被當即寫入到文件系統上的持久日誌中,而沒必要刷新到磁盤。實際上,這僅僅意味着它被轉移到內核的頁緩存中。
以頁緩存爲核心的設計,在這裏文章裏有被描述,此文章是Varnish的設計。

只須要常量時間(Constant Time Suffices)

在消息傳遞系統裏的持久化數據結構一般是一個消費者隊列關聯着一棵BTree或者其餘通用的隨機訪問數據結構來維護消息的元數據。BTree是一個萬能的數據結構,能夠在消息傳遞系統中支持各類事務和非事務性的語義。但它帶來至關高的成本:BTree操做是O(log N)。一般O(log N)本質上被認爲是等於常量時間,但對於硬盤操做則並非這樣。磁盤尋軌達到10ms,而且每一個磁盤一次只能執行一次尋軌,因此並行性是有限的。所以,即便是少許的磁盤尋軌也會致使很高的開銷。因爲存儲系統將很是快的緩存操做與很是慢的物理磁盤操做混合在一塊兒,所以當在緩存固定時,數據增長時,樹結構的性能一般是超線性的。數據加倍則會使速度慢兩倍以上。
直觀上,一個持久的隊列能夠創建在簡單的讀取和追加的形式,這一般也是日誌解決方案使用的。這結構有這樣的好處,全部操做都是O(1),而且讀操做不會阻塞寫和讀的操做。這是具備明顯的優點,是由於性能徹底與數據量大小解耦了,一個服務如今能夠充分利用那些大量的,且便宜,低轉速的SATA驅動器。雖然硬盤的尋軌性能差,但它們的大型讀和寫的性能仍是能夠接受的,並且仍是三分之一的價格就有三倍的容量。
在沒有任何性能懲罰的狀況下訪問幾乎無限的磁盤空間意味着咱們能夠提供一些在消息傳遞系統中不常見的特性。例如,在kafka中,咱們能夠在相對較長的時間內保留消息(好比一個星期),而不是每次消費完就刪除消息。這將給消費者帶來很大的靈活性。

效率(Efficiency)

咱們在效率方面付出大量的努力。咱們最初用例中的一個是處理網站活動數據,這能夠是很是大量的數據:每一個頁面的訪問都會產生許多寫操做。此外,咱們假設每條消息至少被一個消費者讀取(一般是不少消費者),所以咱們努力讓消費盡量的便宜。
咱們還發現,經歷過構建和運行多個相似的系統,有效的多租戶業務的關鍵是效率。
咱們在前面章節討論過硬盤的效率。一旦消除了糟糕的磁盤訪問模式,在這種類型的系統中有兩個常見的低效緣由:太多小的I/O操做和過分的字節複製。
這小IO問題發生在客戶端和服務器之間,和服務器自身的持久化操做中。
爲了不這種狀況,咱們的協議是圍繞一個「消息集(message set)」抽象構建的,該抽象能夠天然地將消息分組在一塊兒。這容許網絡請求將消息分組,並分攤網絡往返的開銷,而不是一次發送一條消息。服務器依次將大量的消息追加到其日誌中,而消費者一次獲取大量的線性塊。
這個簡單的優化產生數量級的加速。批處理致使了更大的網絡數據包、更大的順序磁盤操做、連續的內存塊等等,全部這些都使得Kafka能夠將隨機消息寫入的流變成 線性的寫 流給消費者。
另外一個低效率的是字節複製。在低消息率下,這不是一個問題,但在負載下的影響是顯著的。爲了不這種狀況,咱們採用了一種標準化的二進制消息格式,由生產者、代理和消費者共享(所以數據塊能夠在不進行修改的狀況下傳輸)。
broker維護的消息日誌自己就是一個文件目錄,每一個文件都由一個以生產者和消費者使用的相同格式寫入磁盤的消息集的序列填充。保持這種通用格式能夠優化最重要的操做:持久日誌塊的網絡傳輸。現代unix操做系統爲將數據從頁緩存傳輸到套接字提供了高度優化的代碼路徑;在Linux中,這是經過sendfile的系統調用完成的。
要了解sendfile的做用,首先最重要先理解將數據從文件傳輸到套接字的公共數據路徑:
1.操做系統從磁盤讀取數據到內核空間的頁緩存。
2.應用程序將數據從內核空間讀取到用戶空間緩衝區中。
3.應用程序將數據返回到內核空間,並將其寫入套接字緩衝區。
4.操做系統將數據從套接字緩衝區複製到經過網絡發送的NIC緩衝區。
有4次複製,兩次系統內核調用,這樣的效率固然就低下。使用sendfile,經過容許操做系統直接將數據從頁緩存發送到網絡,避免了重複複製。所以在這個優化的路徑中,只須要最後的複製,一次從磁盤複製到NIC緩衝區便可。——零拷貝(zero-copy)
咱們指望一個常見的用例是在一個主題上有多個使用者。使用上述的零拷貝優化,數據被徹底複製到頁緩存中,並在每次讀取時重複使用,而不是存儲在內存中並在每次讀取時將其複製到用戶空間。這就容許以接近網絡鏈接的極限的速率來讀取消息。
頁緩存和sendfile的組合意味着,在一個Kafka集羣上,在有消費者的機子上,您將看到磁盤上沒有任何讀取活動,由於它們將徹底從緩存中提供數據。
更多Java支持的sendfile和零拷貝,請點擊這裏

端到端的批量壓縮

在某性狀況下,事實上真正的瓶頸不是CPU也不是硬盤,而是網絡帶寬。對於須要在廣域網上的數據中心之間發送消息的數據管道來講,尤爲如此。固然,用戶本身能夠壓縮消息而不須要kafka的支持。但這可能致使很是差的壓縮比,特別是當消息的冗餘字段不少(如JSON裏的字段名和網站日誌裏的user agent或公共字符串)。高效的壓縮須要多個消息壓縮在一塊兒,而不是每一個消息獨立壓縮。
Kafka用高效的批處理格式支持這一點。能夠將一批消息聚合到一塊兒壓縮,並以這種形式發送到服務器。這批消息將以壓縮的形式寫入,而且將在日誌中保持壓縮,而且只會被使用者解壓。
Kafka支持GZIP、Snappy和LZ4壓縮協議。關於壓縮的更多細節能夠在這裏找到。

生產者(The Producer)

負載均衡(Load balancing)

生產者直接發送數據到broker,不須要任何的中間路由層,而接受的broker是該分區的leader。爲了幫助生產者實現這一點,全部Kafka節點均可以回答關於哪些是可用服務器的元數據的請求,以及在任何給定的時間內,某個主題的分區的leader是否容許生產者適當地發送它的請求。
由客戶端控制它想往哪一個分區生產消息。這能夠隨機地進行,實現一種隨機的負載平衡,或者能夠經過一些語義分區函數來實現。咱們提供了語義分區的接口,容許用戶指定一個分區的key,並使用這個key來作hash到一個分區(若是須要的話,也是能夠複寫這分區功能的)。例如,咱們選擇user的id做爲可用,則因此該用戶的信息都會發送到一樣的分區。這反過來又會讓消費者對他們的消費產生局部性的假設。這種明確設計的分區,容許消費者本身本地的處理。

異步發送(Asynchronus send)

批處理是效率的主要驅動因素之一,爲了可以批處理,kafka的生產者會嘗試在內存中積累數據,而後在一塊兒在一個請求中以大批量的形式發送出去。批處理這個能夠設置按固定的消息數量或按特定的延遲(64k或10ms)。這容許累積更多字節的發送出去,這樣只是在服務器上作少許的大IO操做。這種緩衝是可配置的,這樣提供了一種機制來以額外的延遲來提升吞吐量。
具體的配置)和生產者的api能夠在這文檔中找到。

消費者(The Consumer)

kafka消費者的工做方式是,向其想消費的分區的leader發送「fetch」請求。在每一個請求中消費者指定日誌的偏移量,而後接受回一大塊從偏移量開始的日誌。所以,消費者對position有重要的控制權,若是須要,能夠重置position來從新消費數據。

Push和pull

咱們首先考慮的一個問題是,消費者應該是從broker拉取消息,仍是應該是broker把消息推送給消費者。在這方面,kafka遵循了一種更傳統的設計,大多數消息傳遞系統也會用的,那就是數據是從生產者push到broker,消費者是從broker拉取數據。一些日誌集中系統,如Scribe和Apache Flume,遵循一個很是不一樣的,基於推送的路徑,將數據被推到下游。這兩種方法都由利弊,在基於推送的系統,因爲是broker得控制數據傳輸的速率,不一樣消費者可能要不一樣的速率。然而消費者通常的目的都是讓消費者本身可以以最大的速度進行消費,但在基於push的系統,當消費速率低於生產效率時,消費者就不知道該怎麼辦好了(本質上就是一種拒絕服務攻擊(DOS))。一個基於pull的系統就擁有很好的熟悉,消費者能夠簡單的調控速率。

基於pull的系統的另外一個優勢是,它能夠對發送給消費者的數據進行聚合的批處理。基於推送的系統必須選擇當即發送請求或積累更多數據,而後在不知道下游用戶是否可以當即處理它的狀況下發送它。若是對低延遲進行調優,這將致使僅在傳輸結束時發送一條消息,最終將被緩衝,這是浪費。基於pull的設計解決了這個問題,由於用戶老是在日誌的當前位置(或者是一些可配置的最大大小)以後提取全部可用的消息。所以,咱們能夠在不引入沒必要要的延遲的狀況下得到最佳的批處理。

基於pull的系統的缺點是,若是broker沒數據,則消費者可能會不停的輪訓。爲了不這一點,咱們在pull請求上提供了參數,容許消費者在「長輪訓」中阻塞,直到數據達到(而且能夠選擇等待,直到必定數量的本身能夠,確保傳輸的大小)。

你可能詳細其餘可能的設計,如只有pull,點到點。生產者會將本地的日誌寫到本地日誌中,而broker則會從這些日誌中拉取數據。一般還會提出相似的「存儲轉發(store-and-forward)」生產者。這頗有趣,可是咱們以爲不太適合咱們的目標用例:它有成千上萬的生產者。咱們在大規模上運行持久數據系統的經驗使咱們以爲,在許多應用程序中涉及到數千個磁盤,實際上並不會使事情變得更可靠,並且操做起來也會是一場噩夢。在實踐中,咱們發現,咱們能夠在不須要生產者持久化的狀況下,以大規模的SLAs來運行管道。

消費者的Position(Consumer Position)

使人驚訝的是,跟蹤所使用的內容是消息傳遞系統的關鍵性能點之一。
不少消息傳遞系統在broker中保存了關於什麼消息是被消費了的元數據。也就是說,當消息傳遞給消費者時,broker要麼當即記錄信息到本地,要麼就是等待消費者的確認。這是一個至關直觀的選擇,並且對於一臺機器服務器來講,很清楚地知道這些消息的狀態。因爲許多消息傳遞系統中用於存儲的數據結構都很糟糕,所以這(記錄消息狀態)也是一個實用的選擇——由於broker知道什麼是已經被消費的,因此能夠當即刪除它,保持數據的大小。
讓broker和消費者就已經消費的東西達成一致,這可不是小問題。若是一條消息發送到網絡上,broker就把它置爲已消費,但消費者可能處理這條消息失敗了(或許是消費者掛了,也或許是請求超時等),這條消息就會丟失了。爲了解決這個問題,不少消息傳遞系統增長了確認機制。當消息被髮送時,是被標誌爲已發送,而不是已消費;這是broker等待消費者發來特定的確認信息,則將消息置爲已消費。這個策略雖然解決了消息丟失的問題,但卻帶來了新的問題。第一,若是消費者在發送確認信息以前,在處理完消息以後,消費者掛了,則會致使此消息會被處理兩次。第二個問題是關於性能,broker必須保存每一個消息的不一樣狀態(首先先鎖住消息以至於不會讓它發送第二次,其次標誌位已消費從而能夠刪除它)。還有些棘手的問題要處理。如消息被髮送出去,但其確認信息一直沒返回。

kafka處理則不同。咱們的主題被分爲一個有序分區的集合,且每一個分區在任何給定的時間內只會被訂閱它的消費者組中的一個消費者給使用。這意味着每一個分區中的消費者的position僅僅是一個整數,這是下一次消費時,消息的偏移量。這使狀態(記錄是否被消費)很是小,每一個分區只有一個數字。這個狀態能夠被按期檢查。這樣確認一條消息是否被消費的成本就很低。

這樣還附加了一個好處。消費者能夠重置其最早的position從而從新消費數據。這雖然違反了隊列的公共契約,但它卻變成關鍵功能給許多消費者。例如,若是消費者代碼有一個bug,而且在一些消息被消費後才被發現,那麼當bug被修復後,消費者就能夠從新使用這些消息。

離線數據加載(Offline Data Load)

可擴展持久化容許只有週期性地使用批量數據的消費者的可能性,好比按期將批量數據加載到離線系統(如Hadoop或關係數據倉庫)。

消息傳遞語義(Message Delivery Semantics)

如今咱們已經瞭解了些生產者和消費者是怎麼工做的,接下來咱們說下kafka提供給生產者和消費者的語義保證。很明顯這裏提供瞭如下幾種消息傳遞保證機制:

  • 至多一次(At most once),這樣消息可能會丟失,但永遠不會從新傳遞。
  • 至少一次(At least once),這樣消息不可能會丟失,但可能會從新傳遞。
  • 有且僅有一次(Exactly once),這是你們想要的,每一個消息會被傳遞一次,並且也僅僅只有一次。

值得注意的是,這能夠歸結爲兩個問題:發佈消息的持久化保證,以及在消費消息時的保證。
不少系統聲稱提供「有且僅有一次」的傳遞語義,但閱讀這些細節時,會發現其中大部分都是誤導(他們不理解消費者或生產者可能掛掉的狀況,那些有多個消費者處理的狀況,或者是那些被寫入磁盤的數據可能丟失的狀況)。
kafka的語義很直接。在發佈消息時,咱們將消息「提交」到log中。一旦發佈的消息被提交,只要有一個broker複製這個消息被寫入活動分區,它就不會丟失。提交的消息的定義、活動分區以及咱們試圖處理的失敗的類型的描述將在下一節(副本)中詳細描述。如今咱們假設在完美的狀況下,如今讓咱們假設一個完美的、無損的broker,和嘗試理解對生產者和消費者的保證。若是一個生產者試圖發佈消息並經歷一個網絡錯誤,那麼就不能肯定該錯誤發生在消息提交以前仍是以後。這相似於插入到一個數據庫表的自動生成的主鍵的語義。
在0.11.0.0版本以前,若是一個生產者沒有收到一個消息已經提交的響應,那麼它幾乎沒有選擇,只能從新發送消息。這提供了「至少一次」的傳遞語義,由於若是原始請求實際上成功了,那麼在從新發送期間,消息可能再次被寫入到日誌中。從0.11.0.0開始,Kafka生產者也支持一個冪傳遞的選項,該選項保證從新發送不會致使日誌中有這重複的消息。爲了實現這一目標,broker爲每一個生產者分配一個ID,並使用由生產者發送消息時一塊兒把序列號發送到broker,這樣broker就能夠根據序列和id來處理重複的消息。一樣,從0.11.0.0開始,生產者支持使用相似於事務的語義向多個主題分區發送消息:即全部消息都已成功寫入或都失敗寫入。這種狀況的主要應用場景是在Kafka主題之間進行「有且僅有一次」的處理(以下所述)。
並不是全部的用例都須要這樣強的保證。對於延遲敏感的使用,咱們容許生產者指定它須要的持久化級別。若是生產者指定要等待消息被提交要在10ms完成。則生產者能夠指定它異步地執行發送,或者等待直到leader(但不必定是follower)獲得消息。
如今咱們描述下消費者視角下的語義。全部的副本都有相同的日誌和相同的偏移量。消費者控制它在這個日誌中的position。若是消費者從未崩潰,它能夠將這個position存儲在內存中,可是若是消費者崩潰了,咱們但願這個主題的分區來接替這個position的處理,那麼新的進程將須要選擇一個合適的position來開始處理。
消費者讀取消息時,有幾個處理消息和更新其位置的選項。

  1. 第二種是它先讀取消息,而後將position保存到日誌中,最後是處理消息。在這種狀況下,在保存其position以後,在保存處理消息產生的輸出以前,消費者進程可能會崩潰。在這種狀況下,接手處理的過程將從保存的position開始,即便在此position以前的一些消息未被處理。這是對應着「至多一次」的語義,失敗的消息可能不被處理。
  2. 第二種是它先讀取消息,而後處理消息,最後保存position到日誌中。在這種狀況下,在處理消息後,消費者進程可能會崩潰,可是在它保存它的position以前崩潰的。在這種狀況下,當新進程接手了它接收到的最初幾條消息時,或許這幾條消息就已經被處理過了。在消費者崩潰的狀況下,這至關於「至少一次」的語義。在許多狀況下,消息有主鍵,所以更新是冪等的(接收相同的消息兩次,只是用另外一個副本重寫了一個記錄)。

那「有且僅有一次」的語義怎樣(或者是說你到底想要什麼)?從kafka主題中獲取消息處理後發佈到其餘主題(如一個Kafka Streams應用),咱們能夠利用上面提到的版本0.11.0.0裏的新事務生產者的功能。消費者的position被當作一個消息存儲在一個主題,所以咱們能夠在與接收處理數據的輸出主題相同的事務中寫入kafka的偏移量。 若是事務被停止,消費者的position將恢復到原來的值,而輸出主題的生成數據將不會被其餘消費者看到,這取決於他們的「隔離級別」。在默認的「read_uncommitted」隔離級別中,全部消息對消費者都是可見的,即便它們是被停止的事務的一部分,可是在「read_committed」中,使用者只會從提交的事務中返回消息(以及任何不屬於事務的消息)。

當寫入外部系統時,限制是在須要協調消費者的position和實際存儲的輸出。實現這一目標的經典方法是在存儲消費者position和存儲消費者輸出之間引入兩階段提交。但這能夠更簡單地處理,而且一般經過讓消費者將其偏移量存儲在與輸出相同的位置。這樣作比較好,由於消費者可能想要寫入的輸出系統都不支持兩階段提交。做爲一個例子,考慮一個Kafka Connect鏈接器,它在HDFS中填充數據,以及它讀取的數據的偏移量,從而保證數據和偏移量都獲得了更新,或者二者都不更新。對於須要這些更強語義的其餘許多數據系統,咱們遵循相似的模式是爲了那些須要強一致性語義的系統,還爲了這些消息沒有主鍵來容許刪除重複數據。

所以kafka爲了kafka Streams,高效地支持「有且僅有一次」的傳遞,而且在Kafka主題之間傳輸和處理數據時,一般能夠使用事務生產者/消費者提供「有且僅有一次」的傳遞。對於其餘目標系統的「有且僅有一次」的傳遞通常須要協調,但kafka提供了偏移量,它能夠實現這要求(參見Kafka Connect)。不然,缺省狀況下Kafka保證「至少一次」傳遞,而且容許用戶禁止生產者的重試或消費者在處理數據以前提交position,從而實現「至多一次」的專遞。

副本(Replication)

Kafka經過一個可配置的服務器數量對每一個主題的分區進行復制日誌(你您能夠按主題設置此副本因子(replication factor))。這容許在集羣中的服務器發生故障時自動恢復,所以當在出現故障時仍然能夠使用消息。
其餘消息傳遞系統提供了副本相關的特性,但,咱們認爲,這彷佛是一種策略而已,並無大量的使用,並且還有個很大的缺點:slave是未被用上的,吞吐量受到嚴重的影響,恢復還須要繁瑣的人工配置,等等。kafka默認是使用了副本功能,實際上那些副本因子設置爲1的主題,咱們也會當作是使用副本功能的主題。
副本的最小單元是主題的分區。在沒有失敗的狀況下,kafka的每一個分區都是有一個leader,其follower能夠爲零個或多個。包括leader在內的副本數量就是副本因子。全部讀和寫都是經過leader分區。一般狀況,分區的數據量是多個broker,leader的數量時平均分配當每一個broker。follower的日誌和leader的日誌是徹底相同的——它們都具備相同的偏移量和相同順序的消息(固然,在任何給定的時刻,在日誌的末尾可能會有一些還未同步到的消息)。
follower也跟kafka的普通消費者同樣從leader消費消息。follower從leader拉消息時,有個很好的特性,那就時可讓follower很容易地批量把日誌應用到其(follower)日誌中。
跟不少 分佈式系統處理自動恢復 同樣,對於節點是否「存活(alive)」須要有一個明確的定義。對於kafka,節點存活有如下兩個條件:
1.節點必須維護它與ZooKeeper的session(經過ZooKeeper的心跳機制)
2.若是是slave,就必須複製leader,並且不能落後太遠。
知足上述兩個條件的節點,咱們更願意叫「已同步(in sync)」而不是模糊不清的「存活」或「失敗」。leader保持跟蹤這些「已同步」的節點。若是follower掛了,或者卡住了,或者落後太遠了,leader會講起從已同步的副本名單中移除。是有e replica.lag.time.max.ms這配置去控制卡住多長時間和落後多少副本數量。
在分佈式系統術語中,咱們只嘗試處理一個「失敗/恢復」模型,即節點忽然中止工做,而後恢復(可能不知道它們已經死亡)。kafka沒有處理所謂的「拜占庭式」的失敗,即節點產生任意或惡意的響應(多是因爲某些錯誤)。

如今,咱們能夠更精確地定義一個消息的提交,當全部副本都同步到分區,分區而且應用到其日誌中時,就會被認爲是提交的。只有提交的消息纔會分發給消費者。這就意味着消費者不用擔憂當leader崩潰時,消息會丟失。另外一方面,生產者能夠選擇等待消息提交或不提交,這取決與它們對延遲和持久化之間的權衡。生產者能夠使用acks這配置來控制這權衡。注意,這「最小數據量(minimun number)」同步副本的數量設置,是指當消息都同步到全部副本後,kafka再去檢查時,檢查的最小數量。若是生產者對確認要求不太嚴格,則消息一發布就能夠被使用了,即便同步副本數量還沒達到最小值。(這最小值能夠低到只有一個,那就是leader)。

kafka保證消息不會丟,只要任什麼時候候至少有一個已同步的副本存在。
kafka能夠在節點故障的狀況下可用。但存在網絡分區時,就可能沒法使用了。

副本日誌:法定人數,ISR和狀態機 (Replicated Logs: Quorums, ISRs, and State Machines (Oh my!))

分區就是一個副本日誌。副本日誌是分佈式數據系統的最基本的原語(primitvie)之一,並且有不少種實現方式。其餘系統能夠使用副本日誌做爲一種原語,用於在狀態機形式的分佈式系統。
對於一系列值的順序達成一致的過程(一般編號爲0、一、二、…),副本日誌就是將其模型化。有不少方法能夠實現這一點,但最簡單和最快的是leader來選擇序值。只有leader還存活,所喲follower都只須要複製值便可,順序由leader決定。
固然,若是leader不掛,那咱們不必要follower。當leader崩潰時,我從follower中選擇出新的leader。但follower本身可能落後或崩潰,因此咱們必須保證咱們選擇的是最新的follower。日誌複製算法必須這最基本的保證時,若是咱們告訴客戶端消息已經提交了,而此時leader掛了,咱們選擇的新leader也必須包含剛剛那個已經提交了的消息。這就產生了一個權衡:若是leader等待過多的follower確認消息,This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.

若是你指定確認的數量和日誌(與leader對比過的)的數量,這樣就保證有重疊性,那麼這就叫法定人數(Quorums)。

這種權衡最多見的方法是,在提交決策和leader選舉中使用大多數投票。這不是kafka作的,但讓咱們去探索它,瞭解它的利弊。假設咱們有2f+1個副本。若是f+1節點收到消息,沒有超過f個節點失敗,則leader就保證全部消息都被提交,咱們選擇新leader時也同樣。這是由於咱們在任意節點上選擇f+1個節點,這f+1裏必須至少有一個節點包含全部已提交消息的副本。副本最完整的結點將會被選中爲新leader。這裏還有不少算法細節須要處理(如明肯定義日誌的完整性,leader崩潰時怎麼保證一致性,修改集羣中的服務器),這些咱們先暫時忽略。
多數投票方法有個很是好的特性:延遲僅僅取決於多臺最快的服務器。也就是說,若是副本因子時3,那麼延遲由最快的一個slave決定,而不是最慢的slave(leader一個、最快的slave一個,這就達到法定人數了)。
這個家族有不少算法,包括ZooKeeper的Zab,Raft和Viewstamped副本算法。咱們知道的,更接近kafka的用的算法的學術出版是來自微軟的PacificA。
多數投票的不足之處就是,它不須要不少失敗的節點,就可讓你選擇不到leader。爲了容忍一個節點失敗,則須要3個節點,容忍2個,則須要5個節點。在咱們經驗裏,覺得只要剛恰好夠冗餘的副本,就能容忍一個節點的失敗,但這是不實際的,在5倍硬盤空間(5個硬盤,每一個硬盤佔1/5吞吐)狀況下,每次都要寫5次,這對於大量數據的問題時不切實際的。這也是爲何法定人數算法比較經常使用在集羣的配置文件如ZooKeeper,而不多用在原數據存儲上。例如在HDFS的namenode的高可用是創建在多數人投票,但這成本很高的算法不會用在它的數據存儲上。
Kafka使用了一個稍微不太同樣的方法去選擇法定人數。kafka動態的維護一個ISR(in-sync replicas)集合,集合裏面的節點都是已同步。只有這集合裏面的人才適合選舉爲leader。只有全部ISR都收到寫入分區,則這分區的寫入就會被認爲已提交。這ISR保存在ZooKeeper。對於kafka的使用模型來講,這是一個重要的因素,那裏有許多分區,而且確保leader的平衡很重要。ISR模型和f+1副本,一個kafka主題能夠容忍f個失敗(總共就f+1個節點)。
咱們想處理更多的用例,因此這個權衡咱們以爲是合理的。在實際狀況,對於容忍f個節點失敗,多數投票和ISR方法都是須要通用數量的副本確認(好比,容忍1個節點失敗,多數投票方法則須要3個副本和1個確認,ISR方法須要2個副本和1個確認)。確認提交而不須要由最慢的節點來確認這是多數投票方法的好處。但咱們以爲這是能夠經過由客戶端選擇是否阻塞消息提交,以及控制副本因子(下降)而增長吞吐量和磁盤空間來優化這個問題(這問題就是與多數投票對比)。
另外一個重要的設計是,kafka不要求崩潰節點在全部數據完整的狀況下恢復。在這個空間中,副本算法依賴於「穩定存儲」的存在並很多見,這種「穩定存儲」在任何故障恢復場景中都不能丟失,要保證一致性。這有兩個主要問題。首先,硬盤故障是咱們在持久化數據系統的實際操做中最多見的問題,問題發生後,一般也不會完整地保留數據。其次,即便這不是一個問題,咱們也不但願在每次寫入時都須要使用fsync,由於這樣會減小兩到三個數量級的性能。咱們容許一個副本從新加入ISR的協議,這協議確保在從新加入以前,它必須徹底從新同步,即便它在崩潰中丟失了未刷新的數據。

 不清晰的Leader選舉:若是它們都掛了呢?(Unclean leader election: What if they all die?)    

注意,Kafka對數據不丟失的保證是基於至少一個保持同步的副本。若是一個分區的副本都丟失了,則沒法保證數據不丟失。
然而在實際狀況下的系統當全部副本掛以後必須作一些合理的事情。若是很不辛遇到這種狀況,意識到後面會發生什麼這是很重要。可能會出現如下兩種狀況:
1.等待ISR裏的全部節點恢復,並選擇出新的leader(但願這leader還保存着全部的數據)。
2.選擇第一個副本(不須要是ISR裏面的)恢復,做爲leader。

如下是可用性和持久化的權衡。1、若是咱們等待全部ISR副本恢復,則咱們會等很長的時間。。2、若是副本的數據都丟了,則永遠沒法恢復。最後一個就是,若是一個沒有同步的副本恢復,咱們容許它爲leader,則認爲它的日誌是最新的,哪怕它沒有包含全部已提交的消息。在0.11.0.0版本里默認的選擇第一個權衡,用等待來換取數據的一致性。這個是能夠配置的,若是啓動時間比一致性重要,則修改這個 unclean.leader.election.enable。
這個困惑不只僅kafka有。它存在與任何基於法定人數算法的場景。例如,在多數投票的場景,若是你是去大多數服務器,在剩餘的服務器,你就必須在二者選其中一個,不是失去100%的數據就是丟失數據的一致性。

可用性和持久化的保證(Availability and Durability Guarantees)

生產者生成消息時,能夠選擇0個,1個或者所有副本確認。注意這裏的「所有副本確認」不能保證全部被分配副本的結點都能收到消息。默認的,當acks=all時,只要全部當前全部ISR都收到消息,則能夠確認消息。例如,一個主題被設置爲兩個副本和一個失敗(只有剩下一個ISR),而後全部acks=all的寫入都會是成功的。若是剩餘的副本也失敗,這樣消息就會被丟失。儘管這確保了分區的最大可用性,可是這種行爲可能不適合某些喜歡持久化而不是可用性的用戶。所以,咱們提供了兩種頂級的配置,可用於更傾向於消息持久化而不是可用性:
1.關閉不清晰的leader選舉——若是全部副本變得不可用,直到最近的leader變得可用,全部分區才能夠變得能夠用。這有效地避免了消息丟失的風險。請參閱上一節不清晰的Leader選舉。
2.指定最小的ISR數量——只有高過這最小數量,消息纔會被確認,這是爲了不在寫入一個副本時,並且副本掛了,致使消息丟失的風險。這個設置僅僅在生產者使用acks=all生效或保證消息在這數量以上的ISR確認。這個設置提供了一致性和高可用的權衡。ISR最小數量設置高一點,這樣更好的保證一致性。然而這樣會減小可用性,由於在ISR沒知足這數量時,分區是不可用的。

副本管理(Replica Management)

上訴討論副本,也僅僅是一份日誌,也就是主題的一個分區。然而kafka是管理成千上萬的分區。咱們試圖以循環(round-robin)方式在集羣中平衡分區,以免在大數據量的主題的全部分區都在少許節點上。一樣地,咱們試圖平衡leader,使每一個節點都是其必定份額分區的leader。
對ledaer選舉過程進行優化也很重要,由於這是服務不可用的窗口期。一個簡單的leader選舉會在一個節點失敗後,在該節點內全部分區,每一個分區都會舉行一次選舉。相反,咱們選擇一個broker做爲「controller」。這controller檢測broker層次的失敗,負責修改受故障影響的分區的leader。其結果是,咱們可以將許多須要的leadr變動批量處理,這使得選舉過程在大量的分區上變得更加便宜和快速。若是controller失敗了,其中一個存活的節點會變成新的controller。

日誌壓縮(Log Compaction)

日誌壓縮保證kafka在每一個分區,對於每一個key,至少保存其最近的一條消息。這解決了那些須要當應用或系統崩潰後,重啓時需從新加載數據的場景。
到目前位置,咱們只討論了簡單的數據保存方法,那就是當舊日誌數據超過必定時間或達到必定大小的時候會被刪除。這個適用於每條相對獨立的消息,如臨時事件。然而,還有一類很重要的數據,那就是根據key修改數據,一種可變的數據(例如在數據庫表數據的變動那樣)。
咱們討論一個具體的例子。一個主題包含了用戶emial信息,每次用戶更新他們的email信息,咱們都會發送消息到topic,是根據他們的userid作主鍵。如下是咱們發送的消息,userid是123,每條信息都對應着一次的email信息修改(省略號是省略其餘userid的消息)。

123 => bill@microsoft.com
        .
        .
        .
123 => bill@gatesfoundation.org
        .
        .
        .
123 => bill@gmail.com

日誌壓縮給了咱們更細顆粒度保留數據機制,這樣咱們就能夠保證只保留每個key最後的一次變動(如123 => bill@gmail.com)。這樣咱們保證了日誌裏都包含了全部key的最後一個值的快照。這就意味着下游的消費者能夠重建狀態而不須要保存全部的更變日誌。
讓咱們一些日誌壓縮有用的場景,而後咱們在看看是怎麼被使用上。
1.數據庫變動訂閱(Database chagne subscription)。咱們很常見到一份數據集會存在多種數據系統裏,並且這系統裏有一個相似數據庫那樣的(如RDBMS或新潮的key-value系統)。舉個例子,你有一個數據庫、一個緩存、一個搜索集羣和一個Hadoop集羣。這樣每次數據庫的修改,都得映射到那緩存、那搜索集羣和最後在Hadoop裏。在這個場景裏,你只是須要實時最新更新的日誌。但若是須要從新加載進緩存或恢復宕機的搜索節點,就可能須要完整的數據集。
2.事件源(Event sourcing)。這是一種應用設計風格,它將查詢和應用設計結合在一塊兒,並使用日誌做爲程序的主要存儲。
3.高可用日誌(Journaling for high-availability)。一個本地計算的進程能夠經過變動日誌來作到容錯,這樣另外一個進程就能從新加載這些變動繼續處理。一個具體的例子就是流式查詢系統,如計數、彙總和其餘「分組」操做。實時流式處理框架Samza就是使用這功能達到目的的。
在上述場景中,主要處理實時的變動,偶爾須要從新加載或從新處理時,能作的就只有從新加載全部數據。日誌壓縮提供了這兩個功能,處理實時數據變動,和從新加載數據。這種使用日誌的風格,詳情可參看點擊

這思路很簡單。若是咱們保存無窮無盡的日誌,保存上述場景中每一個變動日誌,並且仍是一開始就獲取每一個系統的狀態。使用這個完整的日誌,咱們就能夠恢復到任何一個時間點的狀態。但這種完整日誌的假設時不切實際的,由於對於那些每一行記錄都在變動屢次的系統,即便數據很小,日誌也會無限的增加下去。那咱們就簡單的丟棄舊日誌,雖然能夠限制空間的增加,但也沒法重建狀態——由於舊日誌被丟棄,可能一部分記錄的狀態沒法重建。
相對於粗粒度的基於時間的數據保留策略,日誌壓縮的策略是一種更細顆粒度,基於每一條記錄保存。這個想法是,有選擇性的刪除那些有多個變動記錄的一樣的key。這樣的日誌就保證每一個key都至少有一個最新的狀態。
數據保留策略能夠爲每一個主題設置,因此一個集羣裏有些主題的保存策略能夠設置爲大小和時間來保存數據,有主題也能夠經過壓縮保留。
這個功能的靈感是來自於LinkedIn裏最古老且最成功的基礎架構——一個被稱爲Databus的數據庫變動日誌緩存系統。
跟大多很多天志結構存儲系統不同的時,Kafka是爲了訂閱而設計的,組織數據的形式也是爲了更快的線性讀取和寫入。跟Databus不同之處是,kafka做爲真實源(source-of-truth)存儲,即便上游數據源不具有可重用性的狀況下,它仍是挺有用的。

無論是傳統的RDBMS仍是分佈式的NoSQL存儲在數據庫中的數據老是會更新的,相同key的新記錄更新數據的方式簡單來講有兩種:
1.直接更新(找到數據庫中的已有位置以最新的值替換舊的值)。
2.追加記錄(保留舊的值,查詢時再合併,或者也有一個後臺線程會按期合併)。
採用追加記錄的作法能夠在節點崩潰時用於恢復數據,還有一個好處是寫性能很高,由於是線性寫。
如下是各個數據系統的更新數據方式:

數據系統 更新數據追加到哪裏 數據文件 是否須要壓縮
ZooKeeper log snapshot 不要,由於數據量不大
Redis aof rdb 不須要,由於是內存數據庫
Cassandra commit log data.db 須要,數據存在本地文件
HBase commit log HFile 須要,數據存在HDFS
Kafka commit log commit log 須要,數據存在分區中的Segment裏

日誌壓縮基礎(Log Compaction Basics)

這裏有個更高層次的圖,展現kafka日誌的邏輯存儲結構,框框的每一個數字都是一條消息的偏移量(offset):

日誌的頭部(Log Head)就是傳統的kafka日誌。日誌的尾部(Log Tail)則是被壓縮過的日誌。Log Head是很密集的,偏移量時連續的,保留了全部的消息。值得注意的是在Log Tail的消息雖然被壓縮,但依然保留它一開始被寫入時的偏移量,這個偏移量是永遠不會被改變。並且這壓縮日誌裏的偏移量,在日誌裏依然時有效的。因此,時沒法區分下一個更高的偏移量是什麼,好比說,上面的例子,3六、 3七、 38都是屬於同一個位置。
以上說的都是數據更新時的日誌壓縮,固然日誌壓縮也支持刪除。當發送某個Key的最新版本的消息的內容爲null,這個Key將被刪除(某種程度上也算是更新,如上面的例子就是把email信息置爲null)。這個消息也稱刪除標誌(delete marker),這個刪除標誌會把以前跟這key相同的消息刪掉。但這刪除標誌比較特殊,特殊之處是它是過一段時間才被刪除,從而騰出磁盤空間。而數據刪除的時間點會被標誌爲「刪除保留點(delte retention point)」,也就是如上圖所示,這個圖展現也很特別,你看看兩個是point而不是pointer,也不是指向某個消息,而是消息與消息之間。說明它是個時間點,而不是指向某個消息的指針pointer。

壓縮時經過後臺按期複製日誌段(log segment)完成的。清除時並不會阻塞讀操做,並且還能夠配置不超過必定的IO,從而避免影響消費者和生產者。壓縮日誌段的過程以下:

日誌壓縮提供了什麼保證?(What guarantees does log compaction provide?)

日誌壓縮保證:
1.任何消費者只要是讀取日誌的頭部的,均可以看到全部消息,頭部的消息不會被刪除。這些消息都是有連續的偏移量。Topic的min.compaction.lag.ms參數可用於保證在指定時間內該消息的存在,而不會被壓縮。這提供了消息呆在頭部(未被壓縮)的時間的底線。
2.依然保持則消息的有序性。壓縮永遠不會從新給消息排序,而僅僅是刪除其部分而已。
3.消息的偏移量永遠不會改變。它永遠標誌着消息所在的位置。
4.任何從日誌最開始的地方開始處理都會至少看到每一個key的最終狀態。另外,只要消費者在delete.retention.ms(默認是24小時)這時間內達到日誌的頭部,則將會看到全部刪除記錄的刪除標誌。也就是說:因爲刪除標誌的移除和讀取是同時發生,因此若是錯過delete.retention.ms這時間,消費者會錯過刪除標誌。

 日誌壓縮細節(Log Compaction Details)

日誌壓縮經過日誌清除器(log cleaner)執行,後臺線程池複製日誌段,移除那些存在於Log Head中的記錄。每一個壓縮線程工做以下:
1.選擇Log Head中相對比Log Tail的比例高的日誌。
2.建立Log Head中每一個Key對應的最後一個偏移量的日誌摘要。
3.從頭至尾的開始複製,在複製過程當中刪除相同key的日誌。新的、乾淨的日誌段將馬上被交換(swap)到日誌裏,因此只需一個額外的日誌段大小的硬盤空間就能夠(不須要所有日誌的空間)。
4.Log Head的日誌摘要其實是一個空間緊湊的哈希表。每一個實體只須要24個字節空間。因此8G的cleaner空間,能夠處理大概366G的Log Head(假設每一個消息大小爲1k)。

 日誌清除器的配置(Configuring The Log Cleaner)

Kafka是默認啓用日誌清除器,是個線程池。若是要開啓指定主題的清理功能,你能夠在日誌裏添加如下屬性:

log.cleanup.policy=compact

這個能夠在建立主題時指定或修改主題時指定。

日誌清除器能夠設置多少消息在Log Head而不被刪除。這個啓用是經過設置壓縮時間段:

log.cleaner.min.compaction.lag.ms

若是不設置,則默認是除了最後一個segment以外,其他日誌段都會被壓縮,即最後一個日誌段不會被壓縮。任何已激活的日誌段都不會被壓縮,就算消息的時間已經超過了上面配置的時間,這裏的激活,是指有在消費。

配額(Quotas)   

Kafka集羣有能力強制性地要求控制broker中客戶端使用的資源。如下是兩類客戶的quotas:
1.網絡帶寬quotas,具體到字節(從0.9版本開始)。
2.請求速率quotas,具體到CPU的利用率(網絡和IO的比值)。

爲何配額是必須的?(Why are quotas necessary?)

生產者和消費者有可能生成/消費大量的數據或請求速率很是高,以至於佔滿了broker的資源,致使網絡飽和broker拒絕給其餘客戶端服務。使用quotas就能避免這個問題,在多租戶集羣上尤其重要,由於一部分低質量的客戶可能會下降高質量客戶的用戶體驗。實際上,能夠對API進行這樣的限制。

客戶組(Client groups)

Kafka客戶標識是用戶主體(user principal),用於表明用戶在這安全的集羣上的權限。在無鑑權的時候,broker經過可配置的PrincipalBuilder來提供用戶主體,用來分組。由客戶端應用選擇client-id做爲客戶的邏輯分組。元組(user,client-id)則定義了一個安全邏輯組,共享user principal和chient-id。
quotas能夠被應用到元組(user,client-id),user或client-id組。對於一個鏈接,匹配上的quota將會應用到此鏈接上。例如(user="test-user",client-id="test-client")擁有生產者quota是10MB/s,這個10MB的帶寬將會被user是「test-user」而且client-id是"test-client"的生產者進行共用。

配額的配置(Quota Configuration)

quota能夠按(user,client-id)配置,也能夠按user組配置,也能夠按client-id組配置。默認quota能夠被任何級別的quota給覆蓋。這個機制相似於每一個Topic能夠覆蓋本身的。ZooKeeper的/config/users的quota能夠覆蓋user和(user,client-id)的quota。/config/clients下的則能夠覆蓋client-id的quota。這些ZooKeeper的覆蓋會便可在因此broker中生效,這樣咱們就不須要修改配置時重啓服務器。詳情請點擊
quota配置的優先級以下:

1./config/users/<user>/clients/<client-id>  
2./config/users/<user>/clients/<default>  
3./config/users/<user>  
4./config/users/<default>/clients/<client-id>  
5./config/users/<default>/clients/<default>  
6./config/users/<default>  
7./config/clients/<client-id>  
8./config/clients/<default>

broker的(quota.producer.default, quota.consumer.default)屬性來給每一個client-id組設置默認的網絡帶寬。但後面的版本會刪除這些屬性。
client-id組的默認quota能夠在ZooKeeper中配置。

網絡帶寬配額(Network Bandwidth Quotas)

網絡帶寬quota,具體到字節,並且是有組裏的客戶一塊兒共享。默認的,每一個獨立的客戶組都有一個固定的網絡帶寬的quota。這quota配置在每一個broker。

請求速率配額(Request Rate Quotas)

請求速率quota,具體到時間的百分比,時間是在quota窗口裏每一個broker的處理請求的IO線程和網絡線程。 n%的quota表明一個線程的n%,因此quota總數是((num.io.threads+num.network.threads)×100)%。每一個客戶組在一個quota窗口中最多使用n%的IO線程和網絡線程。因爲分配給IO和網絡的線程數是根據broker主機的cpu個數,則每一個請求速率quota表明着CPU的百分比。

實施(Enforcement)

默認狀況下,每一個惟一的客戶組都會有一個集羣配置好的固定的quota。這個quota是定義在每一個broker上。咱們決定由每一個broer定義這些quota,而不是由集羣爲每一個client統一設置一個quota的緣由,是由於爲了方便共享quota的設置。
若是Broker檢測到超過quota了,會怎麼處理?在咱們的解決方案中,咱們是選擇下降速率,而不是直接返回錯誤。broker會去計算處理這問題的延遲時間,這段時間則不會馬上響應客戶端。這種超過quota的處理,對於客戶端來講是透明的。客戶端不須要作額外的操做。實際上,客戶端額外的動做,若是操做很差,還會加重超過quota的問題。
字節率和線程利用率都會在多個小窗口中監測(一秒鐘有30個窗口),以便快速準確的糾正quota違規行爲。
客戶端字節率在多個小窗口(例如每一個1秒的30個窗口)上進行測量,以便快速檢測和糾正配額違規。 一般,大的測量窗口(例如,每30秒10個窗口)會致使大量的流量,而後是長時間的延遲,這對用戶體驗方面並很差。

參考和翻譯:
RabbitMQ官方 https://www.rabbitmq.com
Kafka官方 http://kafka.apache.org/docum...

相關文章
相關標籤/搜索