簡介前端
Kafka是基於發佈訂閱的消息系統。最初起源於LinkedIn,於2011年成爲開源Apache項目,而後於2012年成爲Apache頂級項目。Kafka用Scala和Java編寫,因其分佈式可擴展架構及可持久化、高吞吐率特徵而被普遍使用。算法
消息隊列緩存
一般在項目中,咱們會由於以下需求而引入消息隊列模塊:服務器
1.解耦:消息系統至關於在處理過程當中間插入了一個隱含的、基於數據的接口層。無需預先定義不一樣的接口地址和請求應答規範,這容許數據上下游獨立決定雙方的處理過程,只須要約定數據格式便可任意擴展服務類型和業務需求。網絡
2.緩衝:消息系統做爲一個緩衝池,應對常見的訪問量不均衡情形。好比特殊節假日的流量劇增和每日不一樣時段的訪問量差別。以及處理不一樣數據類型所需的不一樣實時性。使整個業務處理架構以較低成本得到必定靈活性。架構
3. 異步:不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。app
Kafka的特色負載均衡
做爲一種分佈式的,基於發佈/訂閱的消息系統。Kafka的主要設計目標以下:異步
1.以時間複雜度爲O(1)的方式提供消息持久化能力,即便對TB級以上數據也能保證常數時間複雜度的訪問性能。分佈式
2.高吞吐率。即便在很是廉價的商用機器上也能作到單機支持每秒100K條以上消息的傳輸。
3.支持Kafka Server間的消息分區,及分佈式消費,同時保證每一個Partition內的消息順序傳輸。
4.同時支持離線數據處理和實時數據處理。
5.支持在線水平擴展。
Kafka體系架構
如上圖所示,一個典型的Kafka體系架構包括若干Producer(能夠是服務器日誌,業務數據,頁面前端產生的page view等等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干Consumer (Group),以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,以及在consumer group發生變化時進行rebalance。Producer使用push模式將消息發佈到broker,Consumer使用pull模式從broker訂閱並消費消息。
名詞解釋:
Topic & Partition
一個topic能夠認爲一個一類消息,每一個topic將被分紅多個partition,每一個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱爲offset(偏移量),offset爲一個long型的數字,它惟一標記一條消息。每條消息都被append到partition中,順序寫磁盤所以效率很是高。這是Kafka高吞吐率的重要基礎。
Producer發送消息到broker時,會根據Paritition機制選擇將其存儲到哪個Partition。若是Partition機制設置合理,全部消息能夠均勻分佈到不一樣的Partition裏,這樣就實現了負載均衡。若是一個Topic對應一個文件,那這個文件所在的機器I/O將會成爲這個Topic的性能瓶頸,而有了Partition後,不一樣的消息能夠並行寫入不一樣broker的不一樣Partition裏,極大的提升了吞吐率。能夠經過配置項num.partitions來指定新建Topic的默認Partition數量,也可在建立Topic時經過參數指定,同時也能夠在Topic建立以後經過Kafka提供的工具修改。
Kafka的複製機制
Kafka 中的每一個主題分區都被複制了 n 次,其中的 n 是主題的複製因子(replication factor)。這容許 Kafka 在集羣服務器發生故障時自動切換到這些副本,以便在出現故障時消息仍然可用。Kafka 的複製是以分區爲粒度的,分區的預寫日誌被複制到 n 個服務器。 在 n 個副本中,一個副本做爲 leader,其餘副本成爲 followers。顧名思義,producer 只能往 leader 分區上寫數據(讀也只能從 leader 分區上進行),followers 只按順序從 leader 上覆制日誌。
日誌複製算法(log replication algorithm)必須提供的基本保證是,若是它告訴客戶端消息已被提交,而當前 leader 出現故障,新選出的 leader 也必須具備該消息。在出現故障時,Kafka 會從失去 leader 的 ISR 裏面選擇一個 follower 做爲這個分區新的 leader ;換句話說,是由於這個 follower 是跟上 leader 寫進度的。
每一個分區的 leader 會維護一個 ISR。當 producer 往 broker 發送消息,消息先寫入到對應 leader 分區上,而後複製到這個分區的全部副本中。只有將消息成功複製到全部同步副本(ISR)後,這條消息纔算被提交。因爲消息複製延遲受到最慢同步副本的限制,所以快速檢測慢副本並將其從 ISR 中刪除很是重要。 Kafka 複製協議的細節會有些細微差異。
Kafka的同步機制
Kafka不是徹底同步,也不是徹底異步,而是一種ISR(In-Sync Replicas)機制:
1. leader會維護一個與其基本保持同步的Replica列表,該列表稱爲ISR,每一個Partition都會有一個ISR,並且是由leader動態維護 。
2. 若是一個follower比一個leader落後太多,或者超過必定時間未發起數據複製請求,則leader將其從ISR中移除
3. 當ISR中全部Replica都向Leader發送ACK時,leader才commit,這時候producer才能認爲一個請求中的消息都commit了。
Kafka提供了數據複製算法保證,若是leader發生故障或掛掉,一個新leader被選舉並被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本爲leader,或者說follower追趕leader數據。leader負責維護和跟蹤ISR中全部follower滯後的狀態。當producer發送一條消息到broker後,leader寫入消息並複製到全部follower。消息提交以後才被成功複製到全部的同步副本。消息複製延遲受最慢的follower限制,重要的是快速檢測慢副本,若是follower「落後」太多或者失效,leader將會把它從ISR中刪除。
消息傳輸保障
前面已經介紹了Kafka如何進行有效的存儲,以及瞭解了producer和consumer如何工做。接下來討論的是Kafka如何確保消息在producer和consumer之間傳輸。有如下三種可能的傳輸保障(delivery guarantee):
At most once: 消息可能會丟,但毫不會重複傳輸
At least once:消息毫不會丟,但可能會重複傳輸
Exactly once:每條消息確定會被傳輸一次且僅傳輸一次
Kafka的消息傳輸保障機制很是直觀。當producer向broker發送消息時,一旦這條消息被commit,因爲副本機制(replication)的存在,它就不會丟失。可是若是producer發送數據給broker後,遇到的網絡問題而形成通訊中斷,那producer就沒法判斷該條消息是否已經提交(commit)。雖然Kafka沒法肯定網絡故障期間發生了什麼,可是producer能夠retry屢次,確保消息已經正確傳輸到broker中,因此目前Kafka實現的是at least once。consumer從broker中讀取消息後,能夠選擇commit,該操做會在Zookeeper中存下該consumer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit以後的開始位置相同。固然也能夠將consumer設置爲autocommit,即consumer一旦讀取到數據當即自動commit。若是隻討論這一讀取消息的過程,那Kafka是確保了exactly once, 可是若是因爲前面producer與broker之間的某種緣由致使消息的重複,那麼這裏就是at least once。考慮這樣一種狀況,當consumer讀完消息以後先commit再處理消息,在這種模式下,若是consumer在commit後還沒來得及處理消息就crash了,下次從新開始工做後就沒法讀到剛剛已提交而未處理的消息,這就對應於at most once了。讀完消息先處理再commit。這種模式下,若是處理完了消息在commit以前consumer crash了,下次從新開始工做時還會處理剛剛未commit的消息,實際上該消息已經被處理過了,這就對應於at least once。
要作到exactly once就須要引入消息去重機制。Kafka文檔中說起GUID(Globally Unique Identifier)的概念,經過客戶端生成算法獲得每一個消息的unique id,同時可映射至broker上存儲的地址,即經過GUID即可查詢提取消息內容,也便於發送方的冪等性保證,須要在broker上提供此去重處理模塊,目前版本尚不支持。針對GUID, 若是從客戶端的角度去重,那麼須要引入集中式緩存,必然會增長依賴複雜度,另外緩存的大小難以界定。不僅是Kafka, 相似RabbitMQ以及RocketMQ這類商業級中間件也只保障at least once, 且也沒法從自身去進行消息去重。因此咱們建議業務方根據自身的業務特色進行去重,好比業務消息自己具有冪等性,或者藉助Redis等其餘產品進行去重處理。
Kafka做爲消息隊列:
傳統的消息有兩種模式:隊列和發佈訂閱。 在隊列模式中,消費者池從服務器讀取消息(每一個消息只被其中一個讀取); 發佈訂閱模式:消息廣播給全部的消費者。這兩種模式都有優缺點,隊列的優勢是容許多個消費者瓜分處理數據,這樣能夠擴展處理。可是,隊列不像多個訂閱者,一旦消息者進程讀取後故障了,那麼消息就丟了。而發佈和訂閱容許你廣播數據到多個消費者,因爲每一個訂閱者都訂閱了消息,因此沒辦法縮放處理。
kafka中的Consumer Group有兩種形式:
a、隊列:容許同名的消費者組成員共同處理。
b、發佈訂閱:廣播消息給多個消費者組。
kafka的每一個topic都具備這兩種模式。
傳統的消息系統按順序保存數據,若是多個消費者從隊列消費,則服務器按存儲的順序發送消息,可是,儘管服務器按順序發送,多個並行請求將會是異步的,所以消息可能亂序到達。這意味着只要消息存在並行消費的狀況,順序就沒法保證。消息系統經常經過僅設1個消費者來解決這個問題,可是這意味着沒用到並行處理。
kafka有比傳統的消息系統更強的順序保證。經過並行topic的parition,kafka提供了順序保證和負載均衡。每一個partition僅由同一個消費者組中的一個消費者消費到。並確保消費者是該partition的惟一消費者,並按順序消費數據。每一個topic有多個分區,則須要對多個消費者作負載均衡,但請注意,相同的消費者組中不能有比分區更多的消費者,不然多出的消費者一直處於空等待,不會收到消息。
Kafka做爲存儲系統
全部發布消息到消息隊列和消費分離的系統,實際上都充當了一個臨時存儲系統。Kafka仍是一個很是高性能的存儲系統。寫入到kafka的數據將寫到磁盤並複製到集羣中保證容錯性。並容許生產者等待消息應答,直到消息徹底寫入。kafka的存儲結構保證不管服務器上有50KB或50TB數據,執行效率是類似的,所以可達到水平擴展的目標。還能夠認爲kafka是一種專用於高性能,低延遲,提交日誌存儲,複製,和傳播特殊用途的分佈式文件系統。
Kafka流處理
Kafka的更高目標是實時流處理。在kafka中,流處理持續獲取輸入topic的數據,進行處理加工,而後寫入輸出topic。例如,一個零售APP,接收銷售和出貨的輸入流,統計數量或調整價格後輸出。
簡單的需求能夠直接使用producer和consumer API進行處理。對於複雜的轉換,Kafka提供了更強大的Streams API。可構建聚合計算或鏈接流到一塊兒的複雜應用程序。
綜上所述,Kafka 的設計能夠幫助咱們解決不少架構上的問題。可是想要用好 Kafka 的高性能、低耦合、高可靠性等特性,咱們須要很是瞭解 Kafka,以及咱們自身的業務需求,綜合考慮應用場景。