轉載自 技術世界,原文連接 Kafka設計解析(四)- Kafka Consumer設計解析html
1、High Level Consumerjava
1. Consumer Group算法
2. High Level Consumer Rebalance數據庫
3、Low Level Consumerapache
4、Consumer從新設計session
1. 設計方向併發
本文主要介紹了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer實現的語義,以及適用場景。以及將來版本中對High Level Consumer的從新設計–使用Consumer Coordinator解決Split Brain和Herd等問題。oop
不少時候,客戶程序只是但願從Kafka讀取數據,不太關心消息offset的處理。同時也但願提供一些語義,例如同一條消息只被某一個Consumer消費(單播)或被全部Consumer消費(廣播)。所以,Kafka Hight Level Consumer提供了一個從Kafka消費數據的高層抽象,從而屏蔽掉其中的細節並提供豐富的語義。測試
High Level Consumer將從某個Partition讀取的最後一條消息的offset存於Zookeeper中(Kafka從0.8.2版本開始同時支持將offset存於Zookeeper中與將offset存於專用的Kafka Topic中)。這個offset基於客戶程序提供給Kafka的名字來保存,這個名字被稱爲Consumer Group。Consumer Group是整個Kafka集羣全局的,而非某個Topic的。每個High Level Consumer實例都屬於一個Consumer Group,若不指定則屬於默認的Group。fetch
Zookeeper中Consumer相關節點以下圖所示
不少傳統的Message Queue都會在消息被消費完後將消息刪除,一方面避免重複消費,另外一方面能夠保證Queue的長度比較短,提升效率。而如上文所述,Kafka並不刪除已消費的消息,爲了實現傳統Message Queue消息只被消費一次的語義,Kafka保證每條消息在同一個Consumer Group裏只會被某一個Consumer消費。與傳統Message Queue不一樣的是,Kafka還容許不一樣Consumer Group同時消費同一條消息,這一特性能夠爲消息的多元化處理提供支持。
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還能夠同時將數據實時備份到另外一個數據中心,只須要保證這三個操做所使用的Consumer在不一樣的Consumer Group便可。下圖展現了Kafka在LinkedIn的一種簡化部署模型。
爲了更清晰展現Kafka Consumer Group的特性,筆者進行了一項測試。建立一個Topic (名爲topic1),再建立一個屬於group1的Consumer實例,並建立三個屬於group2的Consumer實例,而後經過Producer向topic1發送Key分別爲1,2,3的消息。結果發現屬於group1的Consumer收到了全部的這三條消息,同時group2中的3個Consumer分別收到了Key爲1,2,3的消息,以下圖所示。
注:上圖中每一個黑色區域表明一個Consumer實例,每一個實例只建立一個MessageStream。實際上,本實驗將Consumer應用程序打成jar包,並在4個不一樣的命令行終端中傳入不一樣的參數運行。
(本節所講述Rebalance相關內容均基於Kafka High Level Consumer)
Kafka保證同一Consumer Group中只有一個Consumer會消費某條消息,實際上,Kafka保證的是穩定狀態下每個Consumer實例只會消費某一個或多個特定Partition的數據,而某個Partition的數據只會被某一個特定的Consumer實例所消費。也就是說Kafka對消息的分配是以Partition爲單位分配的,而非以每一條消息做爲分配單元。這樣設計的劣勢是沒法保證同一個Consumer Group裏的Consumer均勻消費數據,優點是每一個Consumer不用都跟大量的Broker通訊,減小通訊開銷,同時也下降了分配難度,實現也更簡單。另外,由於同一個Partition裏的數據是有序的,這種設計能夠保證每一個Partition裏的數據能夠被有序消費。
若是某Consumer Group中Consumer(每一個Consumer只建立1個MessageStream)數量少於Partition數量,則至少有一個Consumer會消費多個Partition的數據,若是Consumer的數量與Partition數量相同,則正好一個Consumer消費一個Partition的數據。而若是Consumer的數量多於Partition的數量時,會有部分Consumer沒法消費該Topic下任何一條消息。
以下例所示,若是topic1有0,1,2共三個Partition,當group1只有一個Consumer(名爲consumer1)時,該 Consumer可消費這3個Partition的全部數據。
增長一個Consumer(consumer2)後,其中一個Consumer(consumer1)可消費2個Partition的數據(Partition 0和Partition 1),另一個Consumer(consumer2)可消費另一個Partition(Partition 2)的數據。
再增長一個Consumer(consumer3)後,每一個Consumer可消費一個Partition的數據。consumer1消費partition0,consumer2消費partition1,consumer3消費partition2。
再增長一個Consumer(consumer4)後,其中3個Consumer可分別消費一個Partition的數據,另一個Consumer(consumer4)不能消費topic1的任何數據。
此時關閉consumer1,其他3個Consumer可分別消費一個Partition的數據。
接着關閉consumer2,consumer3可消費2個Partition,consumer4可消費1個Partition。
再關閉consumer3,僅存的consumer4可同時消費topic1的3個Partition。
Consumer Rebalance的算法以下:
目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每個Consumer經過在Zookeeper上註冊Watch完成的。每一個Consumer被建立時會觸發Consumer Group的Rebalance,具體啓動流程以下:
在這種策略下,每個Consumer或者Broker的增長或者減小都會觸發Consumer Rebalance。由於每一個Consumer只負責調整本身所消費的Partition,爲了保證整個Consumer Group的一致性,當一個Consumer觸發了Rebalance時,該Consumer Group內的其它Consumer也應該同時觸發Rebalance。
該方式有以下缺陷:
根據Kafka社區wiki,Kafka做者正在考慮在還未發佈的0.9.x版本中使用中心協調器(Coordinator)。大致思想是爲全部Consumer Group的子集選舉出一個Broker做爲Coordinator,由它Watch Zookeeper,從而判斷是否有Partition或者Consumer的增減,而後生成Rebalance命令,並檢查這些Rebalance在全部相關的Consumer中是否被執行成功,若是不成功則重試,若成功則認爲這次Rebalance成功(這個過程跟Replication Controller很是相似)。具體方案將在後文中詳細闡述。
使用Low Level Consumer (Simple Consumer)的主要緣由是,用戶但願比Consumer Group更好的控制數據的消費。好比:
與Consumer Group相比,Low Level Consumer要求用戶作大量的額外工做。
使用Low Level Consumer的通常流程以下
根據社區社區wiki,Kafka在0.9.*版本中,從新設計Consumer多是最重要的Feature之一。本節會根據社區wiki介紹Kafka 0.9.*中對Consumer可能的設計方向及思路。
簡化消費者客戶端
部分用戶但願開發和使用non-java的客戶端。現階段使用non-java開發SimpleConsumer比較方便,但想開發High Level Consumer並不容易。由於High Level Consumer須要實現一些複雜但必不可少的失敗探測和Rebalance。若是能將消費者客戶端更精簡,使依賴最小化,將會極大的方便non-java用戶實現本身的Consumer。
中心Coordinator
如上文所述,當前版本的High Level Consumer存在Herd Effect和Split Brain的問題。若是將失敗探測和Rebalance的邏輯放到一個高可用的中心Coordinator,那麼這兩個問題便可解決。同時還可大大減小Zookeeper的負載,有利於Kafka Broker的Scale Out。
容許手工管理offset
一些系統但願以特定的時間間隔在自定義的數據庫中管理Offset。這就要求Consumer能獲取到每條消息的metadata,例如Topic,Partition,Offset,同時還須要在Consumer啓動時獲得每一個Partition的Offset。實現這些,須要提供新的Consumer API。同時有個問題不得不考慮,便是否容許Consumer手工管理部分Topic的Offset,而讓Kafka自動經過Zookeeper管理其它Topic的Offset。一個可能的選項是讓每一個Consumer只能選取1種Offset管理機制,這可極大的簡化Consumer API的設計和實現。
Rebalance後觸發用戶指定的回調
一些應用可能會在內存中爲每一個Partition維護一些狀態,Rebalance時,它們可能須要將該狀態持久化。所以該需求但願支持用戶實現並指定一些可插拔的並在Rebalance時觸發的回調。若是用戶使用手動的Offset管理,那該需求可方便得由用戶實現,而若是用戶但願使用Kafka提供的自動Offset管理,則須要Kafka提供該回調機制。
非阻塞式Consumer API
該需求源於那些實現高層流處理操做,如filter by, group by, join等,的系統。現階段的阻塞式Consumer幾乎不可能實現Join操做。
##如何經過中心Coordinator實現Rebalance
成功Rebalance的結果是,被訂閱的全部Topic的每個Partition將會被Consumer Group內的一個(有且僅有一個)Consumer擁有。每個Broker將被選舉爲某些Consumer Group的Coordinator。某個Cosnumer Group的Coordinator負責在該Consumer Group的成員變化或者所訂閱的Topic的Partition變化時協調Rebalance操做。
Consumer
(1) Consumer啓動時,先向Broker列表中的任意一個Broker發送ConsumerMetadataRequest,並經過ConsumerMetadataResponse獲取它所在Group的Coordinator信息。ConsumerMetadataRequest和ConsumerMetadataResponse的結構以下
ConsumerMetadataRequest { GroupId => String } ConsumerMetadataResponse { ErrorCode => int16 Coordinator => Broker }
(2)Consumer鏈接到Coordinator併發送HeartbeatRequest,若是返回的HeartbeatResponse沒有任何錯誤碼,Consumer繼續fetch數據。若其中包含IllegalGeneration錯誤碼,即說明Coordinator已經發起了Rebalance操做,此時Consumer中止fetch數據,commit offset,併發送JoinGroupRequest給它的Coordinator,並在JoinGroupResponse中得到它應該擁有的全部Partition列表和它所屬的Group的新的Generation ID。此時Rebalance完成,Consumer開始fetch數據。相應Request和Response結構以下
HeartbeatRequest { GroupId => String GroupGenerationId => int32 ConsumerId => String } HeartbeatResponse { ErrorCode => int16 } JoinGroupRequest { GroupId => String SessionTimeout => int32 Topics => [String] ConsumerId => String PartitionAssignmentStrategy => String } JoinGroupResponse { ErrorCode => int16 GroupGenerationId => int32 ConsumerId => String PartitionsToOwn => [TopicName [Partition]] } TopicName => String Partition => int32
Consumer狀態機
Down:Consumer中止工做
Start up & discover coordinator:Consumer檢測其所在Group的Coordinator。一旦它檢測到Coordinator,即向其發送JoinGroupRequest。
Part of a group:該狀態下,Consumer已是該Group的成員,並週期性發送HeartbeatRequest。如HeartbeatResponse包含IllegalGeneration錯誤碼,則轉換到Stopped Consumption狀態。若鏈接丟失,HeartbeatResponse包含NotCoordinatorForGroup錯誤碼,則轉換到Rediscover coordinator狀態。
Rediscover coordinator:該狀態下,Consumer不中止消費而是嘗試經過發送ConsumerMetadataRequest來探測新的Coordinator,而且等待直到得到無錯誤碼的響應。
Stopped consumption:該狀態下,Consumer中止消費並提交offset,直到它再次加入Group。
故障檢測機制
Consumer成功加入Group後,Consumer和相應的Coordinator同時開始故障探測程序。Consumer向Coordinator發起週期性的Heartbeat(HeartbeatRequest)並等待響應,該週期爲 session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms內未收到HeartbeatResponse,或者發現相應的Socket channel斷開,它即認爲Coordinator已宕機並啓動Coordinator探測程序。若Coordinator在session.timeout.ms內沒有收到一次HeartbeatRequest,則它將該Consumer標記爲宕機狀態併爲其所在Group觸發一次Rebalance操做。
Coordinator Failover過程當中,Consumer可能會在新的Coordinator完成Failover過程以前或以後發現新的Coordinator並向其發送HeatbeatRequest。對於後者,新的Cooodinator可能拒絕該請求,導致該Consumer從新探測Coordinator併發起新的鏈接請求。若是該Consumer向新的Coordinator發送鏈接請求太晚,新的Coordinator可能已經在此以前將其標記爲宕機狀態而將之視爲新加入的Consumer並觸發一次Rebalance操做。
Coordinator
(1)穩定狀態下,Coordinator經過上述故障探測機制跟蹤其所管理的每一個Group下的每一個Consumer的健康狀態。
(2)剛啓動時或選舉完成後,Coordinator從Zookeeper讀取它所管理的Group列表及這些Group的成員列表。若是沒有獲取到Group成員信息,它不會作任何事情直到某個Group中有成員註冊進來。
(3)在Coordinator完成加載其管理的Group列表及其相應的成員信息以前,它將爲HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests返回CoordinatorStartupNotComplete錯誤碼。此時,Consumer會從新發送請求。
(4)Coordinator會跟蹤被其所管理的任何Consumer Group註冊的Topic的Partition的變化,併爲該變化觸發Rebalance操做。建立新的Topic也可能觸發Rebalance,由於Consumer能夠在Topic被建立以前就已經訂閱它了。
Coordinator發起Rebalance操做流程以下所示。
Coordinator狀態機
Down:Coordinator再也不擔任以前負責的Consumer Group的Coordinator
Catch up:該狀態下,Coordinator競選成功,但還未能作好服務相應請求的準備。
Ready:該狀態下,新競選出來的Coordinator已經完成從Zookeeper中加載它所負責管理的全部Group的metadata,並可開始接收相應的請求。
Prepare for rebalance:該狀態下,Coordinator在全部HeartbeatResponse中返回IllegalGeneration錯誤碼,並等待全部Consumer向其發送JoinGroupRequest後轉到Rebalancing狀態。
Rebalancing:該狀態下,Coordinator已經收到了JoinGroupRequest請求,並增長其Group Generation ID,分配Consumer ID,分配Partition。Rebalance成功後,它會等待接收包含新的Consumer Generation ID的HeartbeatRequest,並轉至Ready狀態。
Coordinator Failover
如前文所述,Rebalance操做須要經歷以下幾個階段
(1)Topic/Partition的改變或者新Consumer的加入或者已有Consumer中止,觸發Coordinator註冊在Zookeeper上的watch,Coordinator收到通知準備發起Rebalance操做。
(2)Coordinator經過在HeartbeatResponse中返回IllegalGeneration錯誤碼發起Rebalance操做。
(3)Consumer發送JoinGroupRequest
(4)Coordinator在Zookeeper中增長Group的Generation ID並將新的Partition分配狀況寫入Zookeeper
(5)Coordinator發送JoinGroupResponse
在這個過程當中的每一個階段,Coordinator均可能出現故障。下面給出Rebalance不一樣階段中Coordinator的Failover處理方式。
(1)若是Coordinator的故障發生在第一階段,即它收到Notification並將來得及做出響應,則新的Coordinator將從Zookeeper讀取Group的metadata,包含這些Group訂閱的Topic列表和以前的Partition分配。若是某個Group所訂閱的Topic數或者某個Topic的Partition數與以前的Partition分配不一致,亦或者某個Group鏈接到新的Coordinator的Consumer數與以前Partition分配中的不一致,新的Coordinator會發起Rebalance操做。
(2)若是失敗發生在階段2,它可能對部分而非所有Consumer發出帶錯誤碼的HeartbeatResponse。與第上面第一種狀況同樣,新的Coordinator會檢測到Rebalance的必要性併發起一次Rebalance操做。若是Rebalance是由Consumer的失敗所觸發而且Cosnumer在Coordinator的Failover完成前恢復,新的Coordinator不會爲此發起新的Rebalance操做。
(3)若是Failure發生在階段3,新的Coordinator可能只收到部分而非所有Consumer的JoinGroupRequest。Failover完成後,它可能收到部分Consumer的HeartRequest及另外部分Consumer的JoinGroupRequest。與第1種狀況相似,它將發起新一輪的Rebalance操做。
(4)若是Failure發生在階段4,即它將新的Group Generation ID和Group成員信息寫入Zookeeper後。新的Generation ID和Group成員信息以一個原子操做一次性寫入Zookeeper。Failover完成後,Consumer會發送HeartbeatRequest給新的Coordinator,幷包含舊的Generation ID。此時新的Coordinator經過在HeartbeatResponse中返回IllegalGeneration錯誤碼發起新的一輪Rebalance。這也解釋了爲何每次HeartbeatRequest中都須要包含Generation ID和Consumer ID。
(5)若是Failure發生在階段5,舊的Coordinator可能只向Group中的部分Consumer發送了JoinGroupResponse。收到JoinGroupResponse的Consumer在下次向已經失效的Coordinator發送HeartbeatRequest或者提交Offset時會檢測到它已經失敗。此時,它將檢測新的Coordinator並向其發送帶有新的Generation ID 的HeartbeatRequest。而未收到JoinGroupResponse的Consumer將檢測新的Coordinator並向其發送JoinGroupRequest,這將促使新的Coordinator發起新一輪的Rebalance。