消息隊列二三事

最近在看kafka的代碼,就免不了想看看消息隊列的一些要點:服務質量(QOS)性能擴展性等等,下面一一探索這些概念,並談談在特定的消息隊列如kafka或者mosquito中是如何具體實現這些概念的。html

服務質量

服務語義

服務質量通常能夠分爲三個級別,下面說明它們不一樣語義。java

At most once

至多一次,消息可能丟失,但毫不會重複傳輸。
生產者:徹底依賴底層TCP/IP的傳輸可靠性,不作特殊處理,所謂「發送即忘」。kafka中設置acks=0
消費者:先保存消費進度,再處理消息。kafka中設置消費者自動提交偏移量並設置較短的提交時間間隔。linux

At least once

至少一次,消息毫不會丟,可是可能會重複。
生產者:要作消息防丟失的保證。kafka中設置acks=1 或 all並設置retries>0
消費者:先處理消息,再保存消費進度。kafka中設置消費者自動提交偏移量並設置很長的提交時間間隔,或者直接關閉自動提交偏移量,處理消息後手動調用同步模式的偏移量提交。算法

Exactly once

精確一次,每條消息確定會被傳輸一次且僅一次。
這個級別光靠消息隊列自己並很差保證,有可能要依賴外部組件。
生產者:要作消息防丟失的保證。kafka中設置acks=1 或 all並設置retries>0。mosquito中經過四步握手與DUP、MessageID等標識來實現單次語義。
消費者:要作消息防重複的保證,有多種方案,如:在保存消費進度和處理消息這兩個操做中引入兩階段提交協議;讓消息冪等;讓消費處理與進度保存處於一個事務中來保證原子性。kafka中關閉自動提交偏移量,並設置自定義的再平衡監聽器,監聽到分區發生變化時從外部組件讀取或者存儲偏移量,保證本身或者其餘消費者在更換分區時能讀到最新的偏移量從而避免重複。總之就是結合ConsumerRebalanceListenerseek和一個外部系統(如支持事務的數據庫)共同來實現單次語義。此外,kafka還提供了GUID以便用戶自行實現去重。kafka 0.11版本經過3個大的改動支持EOS:1.冪等的producer;2. 支持事務;3. 支持EOS的流式處理(保證讀-處理-寫全鏈路的EOS)。
這三個級別可靠性依次增長,可是延遲帶寬佔用也會增長,因此實際狀況中,要依據業務類型作出權衡。數據庫

可靠性

上面的三個語義不只須要生產者和消費者的配合實現,還要broker自己的可靠性來進行保證。可靠性就是隻要broker向producer發出確認,就必定要保證這個消息能夠被consumer獲取。apache

kafka 中一個topic有多個partition,每一個partition又有多個replica,全部replica中有一個leaderISR是必定要同步leader後才能返回提交成功的replica集OSR內的replica盡力的去同步leader,可能數據版本會落後。在kafka工做的過程當中,若是某個replica同步速度慢於replica.lag.time.max.ms指定的閾值,則被踢出ISR存入OSR,若是後續速度恢復能夠回到ISR中。能夠配置min.insync.replicas指定ISR中的replica最小數量,默認該值爲1。LEO是分區的最新數據的offset,當數據寫入leader後,LEO就當即執行該最新數據,至關於最新數據標識位。HW是當寫入的數據被同步到全部的ISR中的副本後,數據才認爲已提交,HW更新到該位置,HW以前的數據才能夠被消費者訪問,保證沒有同步完成的數據不會被消費者訪問到,至關於全部副本同步數據標識位。segmentfault

每一個partition的全部replica須要進行leader選舉(依賴ZooKeeper)。在leader宕機後,只能從ISR列表中選取新的leader,不管ISR中哪一個副本被選爲新的leader,它都知道HW以前的數據,能夠保證在切換了leader後,消費者能夠繼續看到HW以前已經提交的數據。當ISR中全部replica都宕機該partition就不可用了,能夠設置unclean.leader.election.enable=true,該選項使得kafka選擇任何一個活的replica成爲leader而後繼續工做,此replica可能不在ISR中,就可能致使數據丟失。因此實際使用中須要進行可用性可靠性的權衡。設計模式

kafka建議數據可靠存儲不依賴於數據強制刷盤(會影響總體性能),而是依賴於replica網絡

順序消費

順序消費是指消費者處理消息的順序與生產者投放消息的順序一致。
主要可能破壞順序的場景是生產者投放兩條消息AB,而後A失敗重投遞致使消費者拿到的消息是BA。多線程

kafka中能保證分區內部消息的有序性,其作法是設置max.in.flight.requests.per.connection=1,也就是說生產者在未獲得broker對消息A的確認狀況下是不會發送消息B的,這樣就能保證broker存儲的消息有序,天然消費者請求到的消息也是有序的。
可是咱們明顯能感受到這會下降吞吐量,由於消息不能並行投遞了,並且會阻塞等待,也無法發揮 batch 的威力。
若是想要整個topic有序,那就只能一個topic一個partition了,一個consumer group也就只有一個consumer了。這樣就違背了kafka高吞吐的初衷。

重複消費

重複消費是指一個消息被消費者重複消費了。 這個問題也是上面第三個語義須要解決的。

通常的消息系統如kafka或者相似的rocketmq都不能也不提倡在系統內部解決,而是配合第三方組件,讓用戶本身去解決。究其緣由仍是解決問題的成本解決問題後得到的價值不匹配,因此乾脆不解決,就像操做系統對待死鎖同樣,採起「鴕鳥政策」。
可是kafka 0.11仍是處理了這個問題,見發行說明,維護者是想讓用戶無可挑剔嘛 [笑cry]。

性能

衡量一個消息系統的性能有許多方面,最多見的就是下面幾個指標。

鏈接數

是指系統在同一時刻能支持多少個生產者或者消費者的鏈接總數。鏈接數和broker採用的網絡IO模型直接相關,常見模型有:單線程鏈接每線程ReactorProactor等。
單線程一時刻只能處理一個鏈接,鏈接每線程受制於server的線程數量,Reactor是目前主流的高性能網絡IO模型,Proactor因爲操做系統對真異步的支持不太行因此還沒有流行。

kafka的broker採用了相似於NettyReactor模型:1(1個Acceptor線程)+N(N個Processor線程)+M(M個Work線程)。
其中Acceptor負責監聽新的鏈接請求,同時註冊OPACCEPT事件,將新的鏈接按照RoundRobin的方式交給某個Processor線程處理。
每一個Processor都有一個NIO selector,向 Acceptor分配的 SocketChannel 註冊 OPREAD、OPWRITE事件,對socket進行讀寫。N由num.networker.threads決定。
Worker負責具體的業務邏輯如:從requestQueue中讀取請求、數據存儲到磁盤、把響應放進responseQueue中等等。M的大小由num.io.threads決定。

Reactor模型通常基於IO多路複用(如selectepoll),是非阻塞的,因此少許的線程能處理大量的鏈接。
若是大量的鏈接都是idle的,那麼Reactor使用epoll的效率是槓槓的,若是大量的鏈接都是活躍的,此時若是沒有Proactor的支持就最好把epoll換成select或者poll
具體作法是-Djava.nio.channels.spi.SelectorProvidersun.nio.ch包下面的EPollSelectorProvider換成PollSelectorProvider

QPS

是指系統每秒能處理的請求數量。QPS一般能夠體現吞吐量(該術語很廣,能夠用TPS/QPS、PV、UV、業務數/小時等單位體現)的大小。

kafka中因爲能夠採用 batch 的方式(還能夠壓縮),因此每秒鐘能夠處理的請求不少(由於減小了解析量網絡往復次數磁盤IO次數等)。另外一方面,kafka每個topic都有多個partition,因此同一個topic下能夠並行(注意不是併發喲)服務多個生產者和消費者,這也提升了吞吐量。

平均響應時間

平均響應時間是指每一個請求得到響應須要的等待時間。

kafka中處理請求的瓶頸(也就是最影響響應時間的因素)最有可能出如今哪些地方呢?
網絡? 有可能,可是這個因素整體而言不是kafka能控制的,kafka能夠對消息進行編碼壓縮並批量提交,減小帶寬佔用;
磁盤? 頗有可能,因此kafka從分利用OS的pagecache,而且對磁盤採用順序寫,這樣能大大提高磁盤的寫入速度。同時kafka還使用了零拷貝技術,把普通的拷貝過程:disk->read buffer->app buffer->socket buffer->NIC buffer 中,read buffer到app buffer的拷貝過程省略了(因此上下文切換也減小了),加快了處理速度。這個功能依賴於 javatransferTo,底層由 linuxsendfile系統調用實現。在 linux2.4及以上 中,數據能夠直接從 read buffer 拷貝到 NIC buffer ,達到了最短拷貝路徑。
此外還有文件分段技術,每一個partition都分爲多個segment,避免了大文件操做的同時提升了並行度。
CPU? 不大可能,由於消息隊列的使用並不涉及大量的計算,常見消耗有線程切換、編解碼、壓縮解壓、內存拷貝等,這些在大數據處理中通常不是瓶頸。

併發數

是指系統同時能處理的請求數量數。通常而言,QPS = 併發數/平均響應時間 或者說 併發數 = QPS*平均響應時間

這個參數通常只能估計或者計算,無法直接測。顧名思義,機器性能越好固然併發數越高咯。此外注意用上多線程技術而且提升代碼的並行度、優化IO模型、減小減小內存分配和釋放等手段都是能夠提升併發數的。

擴展性

消息系統的可擴展性是指要爲系統組件添加的新的成員的時候比較容易。

kafka中擴展性的基石就是topic採用的partition機制。第一,Kafka容許Partitioncluster中的Broker之間移動,以此來解決數據傾斜問題。第二,支持自定義的Partition算法,好比你能夠將同一個Key的全部消息都路由到同一個Partition上去(來得到順序)。第三,partition的全部replica經過ZooKeeper來進行集羣管理,能夠動態增減副本。第四,partition也支持動態增減。

對於producer,不存在擴展問題,只要broker還夠你鏈接就行。
對於consumer,一個consumer group中的consumer能夠增減,可是最好不要超過一個topicpartition數量,由於多餘的consumer並不能提高處理速度,一個partition在同一時刻只能被一個consumer group中的一個consumer消費

代碼上的可擴展性就屬於設計模式的領域了,這裏不談。

參考

《kafka技術內幕》
Kafka的存儲機制以及可靠性
Kafka 0.11.0.0 是如何實現 Exactly-once 語義的

查看原文,來自mageekchiu。總結不到位的地方請不吝賜教。

相關文章
相關標籤/搜索