Kafka設計解析(四)- Kafka Consumer設計解析

原創文章,轉載請務必將下面這段話置於文章開頭處。(已受權InfoQ中文站發佈
  本文轉發自Jason's Blog原文連接 http://www.jasongj.com/2015/08/09/KafkaColumn4html

摘要

  本文主要介紹了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer實現的語義,以及適用場景。以及將來版本中對High Level Consumer的從新設計--使用Consumer Coordinator解決Split Brain和Herd等問題。java

High Level Consumer

  不少時候,客戶程序只是但願從Kafka讀取數據,不太關心消息offset的處理。同時也但願提供一些語義,例如同一條消息只被某一個Consumer消費(單播)或被全部Consumer消費(廣播)。所以,Kafka Hight Level Consumer提供了一個從Kafka消費數據的高層抽象,從而屏蔽掉其中的細節並提供豐富的語義。
  算法

Consumer Group

  High Level Consumer將從某個Partition讀取的最後一條消息的offset存於Zookeeper中(Kafka從0.8.2版本開始同時支持將offset存於Zookeeper中與將offset存於專用的Kafka Topic中)。這個offset基於客戶程序提供給Kafka的名字來保存,這個名字被稱爲Consumer Group。Consumer Group是整個Kafka集羣全局的,而非某個Topic的。每個High Level Consumer實例都屬於一個Consumer Group,若不指定則屬於默認的Group。
  Zookeeper中Consumer相關節點以下圖所示
Consumer Zookeeper Structure
  
  不少傳統的Message Queue都會在消息被消費完後將消息刪除,一方面避免重複消費,另外一方面能夠保證Queue的長度比較短,提升效率。而如上文所述,Kafka並不刪除已消費的消息,爲了實現傳統Message Queue消息只被消費一次的語義,Kafka保證每條消息在同一個Consumer Group裏只會被某一個Consumer消費。與傳統Message Queue不一樣的是,Kafka還容許不一樣Consumer Group同時消費同一條消息,這一特性能夠爲消息的多元化處理提供支持。
kafka consumer group
  
  實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還能夠同時將數據實時備份到另外一個數據中心,只須要保證這三個操做所使用的Consumer在不一樣的Consumer Group便可。下圖展現了Kafka在LinkedIn的一種簡化部署模型。
kafka sample deployment in linkedin
  
  爲了更清晰展現Kafka Consumer Group的特性,筆者進行了一項測試。建立一個Topic (名爲topic1),再建立一個屬於group1的Consumer實例,並建立三個屬於group2的Consumer實例,而後經過Producer向topic1發送Key分別爲1,2,3的消息。結果發現屬於group1的Consumer收到了全部的這三條消息,同時group2中的3個Consumer分別收到了Key爲1,2,3的消息,以下圖所示。
kafka consumer group
  注:上圖中每一個黑色區域表明一個Consumer實例,每一個實例只建立一個MessageStream。實際上,本實驗將Consumer應用程序打成jar包,並在4個不一樣的命令行終端中傳入不一樣的參數運行。數據庫

High Level Consumer Rebalance

  (本節所講述Rebalance相關內容均基於Kafka High Level Consumer)
  Kafka保證同一Consumer Group中只有一個Consumer會消費某條消息,實際上,Kafka保證的是穩定狀態下每個Consumer實例只會消費某一個或多個特定Partition的數據,而某個Partition的數據只會被某一個特定的Consumer實例所消費。也就是說Kafka對消息的分配是以Partition爲單位分配的,而非以每一條消息做爲分配單元。這樣設計的劣勢是沒法保證同一個Consumer Group裏的Consumer均勻消費數據,優點是每一個Consumer不用都跟大量的Broker通訊,減小通訊開銷,同時也下降了分配難度,實現也更簡單。另外,由於同一個Partition裏的數據是有序的,這種設計能夠保證每一個Partition裏的數據能夠被有序消費。
  若是某Consumer Group中Consumer(每一個Consumer只建立1個MessageStream)數量少於Partition數量,則至少有一個Consumer會消費多個Partition的數據,若是Consumer的數量與Partition數量相同,則正好一個Consumer消費一個Partition的數據。而若是Consumer的數量多於Partition的數量時,會有部分Consumer沒法消費該Topic下任何一條消息。
  以下例所示,若是topic1有0,1,2共三個Partition,當group1只有一個Consumer(名爲consumer1)時,該 Consumer可消費這3個Partition的全部數據。
  kafka rebalance 3 partition 1 consumer
  
  增長一個Consumer(consumer2)後,其中一個Consumer(consumer1)可消費2個Partition的數據(Partition 0和Partition 1),另一個Consumer(consumer2)可消費另一個Partition(Partition 2)的數據。
  kafka rebalance 3 partitin 2 consumer
  
  再增長一個Consumer(consumer3)後,每一個Consumer可消費一個Partition的數據。consumer1消費partition0,consumer2消費partition1,consumer3消費partition2。
  kafka rebalance 3 partition 3 consumer
  
  再增長一個Consumer(consumer4)後,其中3個Consumer可分別消費一個Partition的數據,另一個Consumer(consumer4)不能消費topic1的任何數據。
  kafka rebalance 3 partition 4 consumer
  
  此時關閉consumer1,其他3個Consumer可分別消費一個Partition的數據。
  kafka rebalance 3 partition 3 consumer
  
  接着關閉consumer2,consumer3可消費2個Partition,consumer4可消費1個Partition。
  kafka rebalance 3 partition 2 consumer
  
  再關閉consumer3,僅存的consumer4可同時消費topic1的3個Partition。
  kafka rebalance 3 partition 1 consumerapache

  Consumer Rebalance的算法以下:json

  • 將目標Topic下的全部Partirtion排序,存於\(P_T\)
  • 對某Consumer Group下全部Consumer排序,存\(於C_G\),第\(i\)個Consumer記爲\(C_i\)
  • \(N=size(P_T)/size(C_G)\),向上取整
  • 解除\(C_i\)對原來分配的Partition的消費權(i從0開始)
  • 將第\(i\*N\)\((i+1)*N-1\)個Partition分配給\(C_i\)

  
  目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每個Consumer經過在Zookeeper上註冊Watch完成的。每一個Consumer被建立時會觸發Consumer Group的Rebalance,具體啓動流程以下:session

  • High Level Consumer啓動時將其ID註冊到其Consumer Group下,在Zookeeper上的路徑爲/consumers/[consumer group]/ids/[consumer id]
  • /consumers/[consumer group]/ids上註冊Watch
  • /brokers/ids上註冊Watch
  • 若是Consumer經過Topic Filter建立消息流,則它會同時在/brokers/topics上也建立Watch
  • 強制本身在其Consumer Group內啓動Rebalance流程

  在這種策略下,每個Consumer或者Broker的增長或者減小都會觸發Consumer Rebalance。由於每一個Consumer只負責調整本身所消費的Partition,爲了保證整個Consumer Group的一致性,當一個Consumer觸發了Rebalance時,該Consumer Group內的其它全部其它Consumer也應該同時觸發Rebalance。架構

  該方式有以下缺陷:併發

  根據Kafka社區wiki,Kafka做者正在考慮在還未發佈的0.9.x版本中使用中心協調器(Coordinator)。大致思想是爲全部Consumer Group的子集選舉出一個Broker做爲Coordinator,由它Watch Zookeeper,從而判斷是否有Partition或者Consumer的增減,而後生成Rebalance命令,並檢查是否這些Rebalance在全部相關的Consumer中被執行成功,若是不成功則重試,若成功則認爲這次Rebalance成功(這個過程跟Replication Controller很是相似)。具體方案將在後文中詳細闡述。
  oop

Low Level Consumer

  使用Low Level Consumer (Simple Consumer)的主要緣由是,用戶但願比Consumer Group更好的控制數據的消費。好比:

  • 同一條消息讀屢次
  • 只讀取某個Topic的部分Partition
  • 管理事務,從而確保每條消息被處理一次,且僅被處理一次

  與Consumer Group相比,Low Level Consumer要求用戶作大量的額外工做。

  • 必須在應用程序中跟蹤offset,從而肯定下一條應該消費哪條消息
  • 應用程序須要經過程序獲知每一個Partition的Leader是誰
  • 必須處理Leader的變化

  使用Low Level Consumer的通常流程以下

  • 查找到一個「活着」的Broker,而且找出每一個Partition的Leader
  • 找出每一個Partition的Follower
  • 定義好請求,該請求應該能描述應用程序須要哪些數據
  • Fetch數據
  • 識別Leader的變化,並對之做出必要的響應

Consumer從新設計

  根據社區社區wiki,Kafka在0.9.*版本中,從新設計Consumer多是最重要的Feature之一。本節會根據社區wiki介紹Kafka 0.9.*中對Consumer可能的設計方向及思路。
  

設計方向

簡化消費者客戶端
  部分用戶但願開發和使用non-java的客戶端。現階段使用non-java發SimpleConsumer比較方便,但想開發High Level Consumer並不容易。由於High Level Consumer須要實現一些複雜但必不可少的失敗探測和Rebalance。若是能將消費者客戶端更精簡,使依賴最小化,將會極大的方便non-java用戶實現本身的Consumer。
  
中心Coordinator
  如上文所述,當前版本的High Level Consumer存在Herd Effect和Split Brain的問題。若是將失敗探測和Rebalance的邏輯放到一個高可用的中心Coordinator,那麼這兩個問題便可解決。同時還可大大減小Zookeeper的負載,有利於Kafka Broker的Scale Out。
  
容許手工管理offset
  一些系統但願以特定的時間間隔在自定義的數據庫中管理Offset。這就要求Consumer能獲取到每條消息的metadata,例如Topic,Partition,Offset,同時還須要在Consumer啓動時獲得每一個Partition的Offset。實現這些,須要提供新的Consumer API。同時有個問題不得不考慮,便是否容許Consumer手工管理部分Topic的Offset,而讓Kafka自動經過Zookeeper管理其它Topic的Offset。一個可能的選項是讓每一個Consumer只能選取1種Offset管理機制,這可極大的簡化Consumer API的設計和實現。
  
Rebalance後觸發用戶指定的回調
  一些應用可能會在內存中爲每一個Partition維護一些狀態,Rebalance時,它們可能須要將該狀態持久化。所以該需求但願支持用戶實現並指定一些可插拔的並在Rebalance時觸發的回調。若是用戶使用手動的Offset管理,那該需求可方便得由用戶實現,而若是用戶但願使用Kafka提供的自動Offset管理,則須要Kafka提供該回調機制。

非阻塞式Consumer API
  該需求源於那些實現高層流處理操做,如filter by, group by, join等,的系統。現階段的阻塞式Consumer幾乎不可能實現Join操做。

如何經過中心Coordinator實現Rebalance

  成功Rebalance的結果是,被訂閱的全部Topic的每個Partition將會被Consumer Group內的一個(有且僅有一個)Consumer擁有。每個Broker將被選舉爲某些Consumer Group的Coordinator。某個Cosnumer Group的Coordinator負責在該Consumer Group的成員變化或者所訂閱的Topic的Partititon變化時協調Rebalance操做。

Consumer
  1) Consumer啓動時,先向Broker列表中的任意一個Broker發送ConsumerMetadataRequest,並經過ConsumerMetadataResponse獲取它所在Group的Coordinator信息。ConsumerMetadataRequest和ConsumerMetadataResponse的結構以下

ConsumerMetadataRequest
    {
      GroupId                => String
    }
    
    ConsumerMetadataResponse
    {
      ErrorCode              => int16
      Coordinator            => Broker
    }

  2)Consumer鏈接到Coordinator併發送HeartbeatRequest,若是返回的HeartbeatResponse沒有任何錯誤碼,Consumer繼續fetch數據。若其中包含IllegalGeneration錯誤碼,即說明Coordinator已經發起了Rebalance操做,此時Consumer中止fetch數據,commit offset,併發送JoinGroupRequest給它的Coordinator,並在JoinGroupResponse中得到它應該擁有的全部Partition列表和它所屬的Group的新的Generation ID。此時Rebalance完成,Consumer開始fetch數據。相應Request和Response結構以下

HeartbeatRequest
    {
      GroupId                => String
      GroupGenerationId      => int32
      ConsumerId             => String
    }
    
    HeartbeatResponse
    {
      ErrorCode              => int16
    }
    
    JoinGroupRequest
    {
      GroupId                     => String
      SessionTimeout              => int32
      Topics                      => [String]
      ConsumerId                  => String
      PartitionAssignmentStrategy => String
    }
    
    JoinGroupResponse
    {
      ErrorCode              => int16
      GroupGenerationId      => int32
      ConsumerId             => String
      PartitionsToOwn        => [TopicName [Partition]]
    }
    TopicName => String
    Partition => int32

Consumer狀態機
  Consumer狀態圖
  Down:Consumer中止工做
  Start up & discover coordinator:Consumer檢測其所在Group的Coordinator。一旦它檢測到Coordinator,即向其發送JoinGroupRequest。
  Part of a group:該狀態下,Consumer已是該Group的成員,並週期性發送HeartbeatRequest。如HeartbeatResponse包含IllegalGeneration錯誤碼,則轉換到Stopped Consumption狀態。若鏈接丟失,HeartbeatResponse包含NotCoordinatorForGroup錯誤碼,則轉換到Rediscover coordinator狀態。
  Rediscover coordinator:該狀態下,Consumer不中止消費而是嘗試經過發送ConsumerMetadataRequest來探測新的Coordinator,而且等待直到得到無錯誤碼的響應。
  Stopped consumption:該狀態下,Consumer中止消費並提交offset,直到它再次加入Group。
  
  
  
故障檢測機制
  Consumer成功加入Group後,Consumer和相應的Coordinator同時開始故障探測程序。Consumer向Coordinator發起週期性的Heartbeat(HeartbeatRequest)並等待響應,該週期爲 session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms內未收到HeartbeatResponse,或者發現相應的Socket channel斷開,它即認爲Coordinator已宕機並啓動Coordinator探測程序。若Coordinator在session.timeout.ms內沒有收到一次HeartbeatRequest,則它將該Consumer標記爲宕機狀態併爲其所在Group觸發一次Rebalance操做。
  Coordinator Failover過程當中,Consumer可能會在新的Coordinator完成Failover過程以前或以後發現新的Coordinator並向其發送HeatbeatRequest。對於後者,新的Cooodinator可能拒絕該請求,導致該Consumer從新探測Coordinator併發起新的鏈接請求。若是該Consumer向新的Coordinator發送鏈接請求太晚,新的Coordinator可能已經在此以前將其標記爲宕機狀態而將之視爲新加入的Consumer並觸發一次Rebalance操做。

Coordinator
  1)穩定狀態下,Coordinator經過上述故障探測機制跟蹤其所管理的每一個Group下的每一個Consumer的健康狀態。
  2)剛啓動時或選舉完成後,Coordinator從Zookeeper讀取它所管理的Group列表及這些Group的成員列表。若是沒有獲取到Group成員信息,它不會作任何事情直到某個Group中有成員註冊進來。
  3)在Coordinator完成加載其管理的Group列表及其相應的成員信息以前,它將爲HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests返回CoordinatorStartupNotComplete錯誤碼。此時,Consumer會從新發送請求。
  4)Coordinator會跟蹤被其所管理的任何Consumer Group註冊的Topic的Partition的變化,併爲該變化觸發Rebalance操做。建立新的Topic也可能觸發Rebalance,由於Consumer能夠在Topic被建立以前就已經訂閱它了。
  Coordinator發起Rebalance操做流程以下所示。
kafka coordinator rebalance

Coordinator狀態機
  Coordinator狀態圖
  Down:Coordinator再也不擔任以前負責的Consumer Group的Coordinator
  Catch up:該狀態下,Coordinator競選成功,但還未能作好服務相應請求的準備。
  Ready:該狀態下,新競選出來的Coordinator已經完成從Zookeeper中加載它所負責管理的全部Group的metadata,並可開始接收相應的請求。
  Prepare for rebalance:該狀態下,Coordinator在全部HeartbeatResponse中返回IllegalGeneration錯誤碼,並等待全部Consumer向其發送JoinGroupRequest後轉到Rebalancing狀態。
  Rebalancing:該狀態下,Coordinator已經收到了JoinGroupRequest請求,並增長其Group Generation ID,分配Consumer ID,分配Partition。Rebalance成功後,它會等待接收包含新的Consumer Generation ID的HeartbeatRequest,並轉至Ready狀態。

Coordinator Failover
  如前文所述,Rebalance操做須要經歷以下幾個階段
  1)Topic/Partition的改變或者新Consumer的加入或者已有Consumer中止,觸發Coordinator註冊在Zookeeper上的watch,Coordinator收到通知準備發起Rebalance操做。
  2)Coordinator經過在HeartbeatResponse中返回IllegalGeneration錯誤碼發起Rebalance操做。
  3)Consumer發送JoinGroupRequest
  4)Coordinator在Zookeeper中增長Group的Generation ID並將新的Partition分配狀況寫入Zookeeper
  5)Coordinator發送JoinGroupResponse
  
  在這個過程當中的每一個階段,Coordinator均可能出現故障。下面給出Rebalance不一樣階段中Coordinator的Failover處理方式。
  1)若是Coordinator的故障發生在第一階段,即它收到Notification並將來得及做出響應,則新的Coordinator將從Zookeeper讀取Group的metadata,包含這些Group訂閱的Topic列表和以前的Partition分配。若是某個Group所訂閱的Topic數或者某個Topic的Partition數與以前的Partition分配不一致,亦或者某個Group鏈接到新的Coordinator的Consumer數與以前Partition分配中的不一致,新的Coordinator會發起Rebalance操做。
  2)若是失敗發生在階段2,它可能對部分而非所有Consumer發出帶錯誤碼的HeartbeatResponse。與第上面第一種狀況同樣,新的Coordinator會檢測到Rebalance的必要性併發起一次Rebalance操做。若是Rebalance是由Consumer的失敗所觸發而且Cosnumer在Coordinator的Failover完成前恢復,新的Coordinator不會爲此發起新的Rebalance操做。
  3)若是Failure發生在階段3,新的Coordinator可能只收到部分而非所有Consumer的JoinGroupRequest。Failover完成後,它可能收到部分Consumer的HeartRequest及另外部分Consumer的JoinGroupRequest。與第1種狀況相似,它將發起新一輪的Rebalance操做。
  4)若是Failure發生在階段4,即它將新的Group Generation ID和Group成員信息寫入Zookeeper後。新的Generation ID和Group成員信息以一個原子操做一次性寫入Zookeeper。Failover完成後,Consumer會發送HeartbeatRequest給新的Coordinator,幷包含舊的Generation ID。此時新的Coordinator經過在HeartbeatResponse中返回IllegalGeneration錯誤碼發起新的一輪Rebalance。這也解釋了爲何每次HeartbeatRequest中都須要包含Generation ID和Consumer ID。
  5)若是Failure發生在階段5,舊的Coordinator可能只向Group中的部分Consumer發送了JoinGroupResponse。收到JoinGroupResponse的Consumer在下次向已經失效的Coordinator發送HeartbeatRequest或者提交Offset時會檢測到它已經失敗。此時,它將檢測新的Coordinator並向其發送帶有新的Generation ID 的HeartbeatRequest。而未收到JoinGroupResponse的Consumer將檢測新的Coordinator並向其發送JoinGroupRequest,這將促使新的Coordinator發起新一輪的Rebalance。

Kafka系列文章

相關文章
相關標籤/搜索