消息隊列之Kafka——從架構技術從新理解Kafka

 

Apache Kafka® 是 一個分佈式流處理平臺. 這到底意味着什麼呢?

咱們知道流處理平臺有如下三種特性:html

  1. 可讓你發佈和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統相似。
  2. 能夠儲存流式的記錄,而且有較好的容錯性。
  3. 能夠在流式記錄產生時就進行處理。

Kafka適合什麼樣的場景?linux

它能夠用於兩大類別的應用:算法

  1. 構造實時流數據管道,它能夠在系統或應用之間可靠地獲取數據。 (至關於message queue)
  2. 構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,經過kafka stream topic和topic之間內部進行變化

Kafka有四個核心的API:數據庫

  • The Producer API 容許一個應用程序發佈一串流式的數據到一個或者多個Kafka topic。
  • The Consumer API 容許一個應用程序訂閱一個或多個 topic ,而且對發佈給他們的流式數據進行處理。
  • The Streams API 容許一個應用程序做爲一個流處理器,消費一個或者多個topic產生的輸入流,而後生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。
  • The Connector API 容許構建並運行可重用的生產者或者消費者,將Kafka topics鏈接到已存在的應用程序或者數據系統。好比,鏈接到一個關係型數據庫,捕捉表(table)的全部變動內容。

在Kafka中,客戶端和服務器使用一個簡單、高性能、支持多語言的 TCP 協議.此協議版本化而且向下兼容老版本, 咱們爲Kafka提供了Java客戶端,也支持許多其餘語言的客戶端。apache

————————————————————————————————————————————————api

以上摘自Apache Kafka官網緩存

 

而本文關注的焦點是:構造實時流數據管道,即message queue部分。也就是咱們常使用的「消息隊列」部分,這部分自己也是Kafka最初及最基本的底層設計。服務器

 

讓咱們回到最初Kafka尚未設計出來的時候,經過從新設計Kafka,一步步瞭解爲何Kafka是咱們如今看到的樣子,到時咱們將瞭解到Kafka做爲消息隊列會高吞吐量、分佈式、高容錯穩定。咱們把這個項目命名爲:Kafka-R網絡

 

如今咱們開始設計Kafka-R,咱們正式設計Kafka-R以前須要考慮設計目標,也就是個人Kafka-R設計出來究竟是用來幹嗎的,適用於什麼業務場景,解決什麼需求痛點。數據結構

能夠很快想到:數據交換。這是消息隊列的基本功能與要求。

而後呢?能夠做爲個大平臺,支持多語言,最好能知足大公司的業務需求,並且最好是實時的,至少是低延遲。

歸納起來就是:咱們設計Kafka-R的目標是能夠做爲一個統一的平臺來處理大公司可能擁有的全部實時數據饋送。

爲了知足咱們的Kafka-R的設計目標,那麼Kafka-R須要具有如下這些特徵:

具備高吞吐量來支持高容量事件流。

可以正常處理大量的數據積壓,以便支持來自離線系統的週期性數據加載。

系統必須處理低延遲分發,來處理更傳統的消息傳遞用例。

數據饋送分區與分佈式,以及實時。

系統在出現機器故障時可以保證容錯。

 

1、數據的存儲方式——in-memory&in-disk

有兩種選擇:第一種,使用in-memory cache,並在空間不足的的時候將數據flush到文件系統中。

另一種,使用in-disk,一開始把全部的數據寫入文件系統的持久化日誌中。

咱們的Kafka-R採用in-disk。實際上在此狀況數據被轉移到了內核的pagecache中。

「磁盤速度慢」是人們的廣泛印象,那麼Kafka-R的數據存儲和緩存基於文件系統,這樣的性能可以接受嗎?

而事實是,磁盤的速度比人們預期的要慢得多,也快得多,取決於人們使用磁盤的方式。

咱們知道磁盤有順序讀和隨機讀兩種模式,之間的性能差別很大,但具體差距多少呢?

使用6個7200rpm、SATA接口、RAID-5的磁盤陣列在JBOD配置下的順序寫入的性能約爲600MB/秒,但隨機寫入的性能僅約爲100k/秒,相差6000倍。 

線性的讀取和寫入是磁盤使用模式中最有規律的,而且操做系統進行了大量的優化。現代操做系統提供了read-ahead和write-behind技術,read-ahead是以大的data block爲單位預先讀取數據,而write-hehind將多個小型的邏輯寫合併成一次大型的物理磁盤寫入。

 

磁盤除了訪問模式,還有兩個低效率操做影響系統的性能:大量的小型I/O操做,過多的字節拷貝。

那麼咱們怎麼處理這些問題呢?

針對於大量的小型I/O操做,Kafka-R使用「消息塊」將消息合理分組。使網絡請求將多個消息打包成一組,而不是每次發送一條消息,從而使整組消息分擔網絡往返的開銷。

另外一個過多的字節拷貝,Kafka-R使用producer,broker和consumer都共享的標準化通用的二進制消息格式,這樣數據塊不用修改就能在他們之間傳遞。

保持這種通用的格式有什麼用呢?

能夠對持久化日誌塊的網絡傳輸進行優化。現代的unix操做系統提供了一個高度優化的編碼方式,用於將數據從pagecache轉移到socket網絡鏈接中。

數據從文件到套接字的常見數據傳輸過程:磁盤->pagecache->用戶空間緩存區->套接字緩衝區(內核空間)->NIC緩存區

1. 操做系統從磁盤讀區數據到內核空間的pagecache

2. 應用程序讀取內核空間的數據到用戶空間的緩存區

3. 應用程序將數據(用戶空間的緩存區)寫會內核空間到套接字緩衝區(內核空間)

4. 操做系統將數據從套接字緩衝區(內核空間)複製到可以經過網絡發送的NIC緩衝區

共進行了4次copy操做和2次系統調用,顯然很低效。在Linux系統中使用zero-copy(零拷貝)優化,其中之一sendfile,使用後的數據傳輸過程是這樣:磁盤->pagecache->NIC緩存區。

咱們的Kafka-R經過使用zero-copy優化技術,能夠用盡量低的消費代價讓多個consumer消費。數據在使用時只會被複制到pagecache中一次,這樣消息可以以接近網絡鏈接的速度上限進行消費。

 

 

2、數據結構——BTree&日誌解決方案

日誌解決方案即簡單讀取與追加來操做文件。

咱們的Kafka-R採用日誌解決方案。

咱們知道BTree是通用的數據結構,其普遍用於隨機的數據訪問。BTree的操做時間複雜度是O(log N),基本等同於常數時間,但在磁盤上則不成立。

每一個磁盤同時只能執行一次尋址,並行性受到限制。少許的磁盤尋址也有很高的開銷。數據翻倍時性能降低不止兩倍。 

而日誌解決方案的數據存儲架構,全部的操做時間複雜度都是O(1),而且讀不會阻塞寫,讀之間也不會相互影響。

因爲性能和數據的大小是徹底分離的,則服務器可使用大量廉價、低轉速的1+TB SATA硬盤,即便這些硬盤的尋址性能不好,在大規模讀寫的性能也能夠接受,並且三分之一的價格三倍的容量。

 

 

3、獲取數據方式——push-based&pull-based

由consumer從broker那裏pull數據呢?仍是從broker將數據push到consumer?

咱們的Kafka-R採用pull-based方式。

這是大多數消息系統所共享的傳統的方式:即producer把數據push到broker,而後consumer從broker中pull數據。

 

push-based系統優勢:

1. 讓consumer可以以最大速率消費。

push-based系統缺點:

1. 因爲broker控制着數據傳輸速率,因此很難處理不一樣的consumer。

2. 當消費速率低於生產速率時,consumer每每會不堪重負(本質相似於拒絕服務攻擊)。

3. 必須選擇當即發送請求或者積累更多的數據,而後在不知道下游的consumer可否當即處理它的狀況下發送這些數據。特別系統爲低延遲狀態下,這樣會極度糟糕浪費。

 

pull-based系統優勢:

1. 能夠大批量生產要發送給consumer的數據。

pull-based系統缺點:

1. 若是broker中沒有數據,consumer可能會在一個緊密的循環中結束輪詢,實際上會busy-waiting直到數據到來。

 

爲了不busy-waiting,咱們的Kafka-R的pull參數重加入參數,使得consumer在一個「long pull」中阻塞等待,知道數據到來(還能夠選擇等待給定字節長度的數據來確保傳輸長度)。

 

 

4、消費者的位置——consumed&offset

Kafka-R的消費過程:consumer經過向broker發出一個「fetch」請求來獲取它想要消費的partition。consumer的每一個請求在log中指定了對應的offset,並接收從該位置開始的一大塊數據。

consumed指經過狀態標示已經被消費的數據。

大多數消息系統都在broker上保存被消費消息的元數據。當消息被傳遞給consumer,broker要麼當即在本地記錄該事件,要麼等待consumer的確認後再記錄。

消費者的位置問題其實就是broker和consumer之間被消費數據的一致性問題。若是broker再每條消息被髮送到網絡的時候,當即將其標記爲consumd,那麼一旦consumer沒法處理該消息(可能由consumer崩潰或者請求超時或者其餘緣由致使),該消息就會丟失。爲了解決消息丟失的問題,許多消息系統增長了確認機制:即當消息被髮送出去的時候,消息被標記爲sent而不是consumed;而後broker會等待一個來自consumer的特定確認,再將消息標記爲consumed。這個策略修復了消息丟失的問題,但也產生了新問題。首先,若是consumer處理了消息但在發送確認以前出錯了,那麼該消息就會被消費兩次。第二個是有關性能的,broker必須爲每條消息保存多個狀態(首先對其加鎖,確保該消息只被發送一次,而後將其永久的標記爲consumed,以便將其移除)。還有更棘手的問題,好比如何處理已經發送但一直等不到確認的消息。

Kafka-R使用offse來處理消息丟失問題。topic被分割成一組徹底有序的partition,其中每個partition在任意給定的時間內只能被每一個訂閱了這個topic的consumer組中的一個consumer消費。意味着partition中每個consumer的位置僅僅是一個數字,即下一條要消費的消息的offset。這樣就能夠按很是低的代價實現和消息確認機制等同的效果。consumer還能夠回退到以前的offset再次消費以前的數據,這樣的操做違背了隊列的基本原則,但事實證實對consumer來講是個很重要的特性。若是consumer代碼由bug,而且在bug被發現以前有部分數據被消費了,consumer能夠在bug修復後經過回退到以前的offset再次消費這些數據。

 

 

 5、leader選舉——多數投票機制f+1&ISR

Kafka-R動態維護了一個同步狀態的備份的集合(a set of in-sync replicas),簡稱ISR。

在瞭解ISR以前咱們須要先了解in-sync。

Kafka-R判斷節點是否存活有兩種方式:

1. 節點必須能夠維護和ZooKeeper的鏈接,ZooKeeper經過心跳機制檢查每一個節點的鏈接。

2. 若是節點是個follower,它必須能及時的同步leader的寫操做,而且延時不能過久。

只有知足上面兩個條件的節點就處於「in sync」狀態。leader會追蹤全部「in sync」的節點,若是有節點掛掉了,或是寫超時,或是心跳超時,leader就會把它從同步副本列表中移除。

在ISR集合中節點會和leader保持高度一致,只有這個集合的成員纔有資格被選舉爲leader,一條消息必須被這個集合全部節點讀取並追加到日誌中了,這條消息才能視爲提交。

ISR集合發生變化會在ZooKeeper持久化,因此這個集合中的任何一個節點都有資格被選爲leader。

 

多數投票機制f+1顧名思義:假設咱們有2f+1個副本,若是在leader宣佈消息提交以前必須有f+1個副本收到該消息,而且若是咱們從這隻少f+1個副本之中,有着最完整的日誌記錄的follower裏來選擇一個新的leader,那麼在故障數小於f的狀況下,選舉出的leader保證具備全部提交的消息。

多數投票算法必須處理許多細節,好比精肯定義怎樣使日誌更加完整,確保在leader down期間,保證日誌一致性或者副本服務器的副本集改變。

多數投票機制有一個很是好的優勢:延遲取決於較快的服務器。也就是說,若是副本數是3,則備份完成的等待時間取決於最快的follwer。

所以提交時能避免最慢的服務器,這也是多數投票機制的優勢。

一樣多數投票的缺點也很明顯,多數的節點掛掉後不能選擇出leader。而經過冗餘來避免故障率,會下降吞吐量,不利於處理海量數據。

是一種Quorum讀寫機制(若是選擇寫入時候須要保證必定數量的副本寫入成功,讀取時須要保證讀取必定數量的副本,讀取和寫入之間有重疊)。

 

Kafka-R保證只要有只少一個同步中的節點存活,提交的消息就不會丟失。

在一次故障生存以後,大多數的quorum須要三個備份節點和一次確認,ISR只須要兩個備份節點和一次確認。

建立副本的單位是topic的partition,正常狀況下,每一個分區都有一個leader和零或多個follower。總的副本數是包括leader與全部follwer的總和。全部的讀寫操做都由leader處理,通常partition的數量都比broker的數量多的多,各分區的leader均勻分佈在broker中。全部的follower節點都同步leader節點的日誌,日誌中的消息和偏移量都和leader保持一致。

 

 

6、Uclean leader選舉——ISR副本&第一個副本

若是節點全掛了的服務恢復。

Kafka-R對於數據不會丟失時基於只少一個節點保持同步狀態,而一旦分區上的全部備份節點都掛了,就沒法保證了。

Kafka-R默認「第一個副本」策略。

 

ISR副本:等待一個ISR的副本從新恢復正常服務,並選擇這個副本做爲新leader(極大可能擁有所有數據)

第一個副本:選擇第一個從新恢復正常服務的副本(不必定是ISR)做爲leader。

 

這是可用性和一致性之間的簡單妥協,若是隻等待ISR的備份節點,只要ISR備份節點都掛了,那麼服務都一直會不可用,若是他們的數據損壞了或者丟失了,那就會是長久的宕機。另外一方面,若是不是ISR中的節點恢復服務而且咱們容許它成爲leader,那麼它的數據就是可信的來源,即便它不能保證記錄了每個已經提交的消息。

能夠配置屬性unclean.leader.election.enable禁用次策略,那麼就會使用「ISR副本」策略即停機時間優於不一樣步,以修改默認配置。

 

經過以上的架構技術的分析和選型,咱們就大體設計出了咱們的消息隊列Kafka-R

相關文章
相關標籤/搜索