個人我的博客排版更舒服: https://www.luozhiyun.com/archives/260java
Kafka 體系架構包括若干 Producer、若干 Broker、若干 Consumer,以及一個 ZooKeeper 集羣。node
在 Kafka 中還有兩個特別重要的概念—主題(Topic)與分區(Partition)。apache
Kafka 中的消息以主題爲單位進行歸類,生產者負責將消息發送到特定的主題(發送到 Kafka 集羣中的每一條消息都要指定一個主題),而消費者負責訂閱主題並進行消費。數組
主題是一個邏輯上的概念,它還能夠細分爲多個分區,一個分區只屬於單個主題,不少時候也會把分區稱爲主題分區(Topic-Partition)。緩存
Kafka 爲分區引入了多副本(Replica)機制,經過增長副本數量能夠提高容災能力。同一分區的不一樣副本中保存的是相同的消息(在同一時刻,副本之間並不是徹底同樣),副本之間是「一主多從」的關係,其中 leader 副本負責處理讀寫請求,follower 副本只負責與 leader 副本的消息同步。當 leader 副本出現故障時,從 follower 副本中從新選舉新的 leader 副本對外提供服務。安全
如上圖所示,Kafka 集羣中有4個 broker,某個主題中有3個分區,且副本因子(即副本個數)也爲3,如此每一個分區便有1個 leader 副本和2個 follower 副本。網絡
分區中的全部副本統稱爲 AR(Assigned Replicas)。全部與 leader 副本保持必定程度同步的副本(包括 leader 副本在內)組成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一個子集。session
與 leader 副本同步滯後過多的副本(不包括 leader 副本)組成 OSR(Out-of-Sync Replicas),因而可知,AR=ISR+OSR。在正常狀況下,全部的 follower 副本都應該與 leader 副本保持必定程度的同步,即 AR=ISR,OSR 集合爲空。架構
Leader 副本負責維護和跟蹤 ISR 集合中全部 follower 副本的滯後狀態,當 follower 副本落後太多或失效時,leader 副本會把它從 ISR 集合中剔除。默認狀況下,當 leader 副本發生故障時,只有在 ISR 集合中的副本纔有資格被選舉爲新的 leader。ide
HW 是 High Watermark 的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個 offset 以前的消息。
LEO 是 Log End Offset 的縮寫,它標識當前日誌文件中下一條待寫入消息的 offset。
如上圖所示,第一條消息的 offset(LogStartOffset)爲0,最後一條消息的 offset 爲8,offset 爲9的消息用虛線框表示,表明下一條待寫入的消息。日誌文件的 HW 爲6,表示消費者只能拉取到 offset 在0至5之間的消息,而 offset 爲6的消息對消費者而言是不可見的。
整個生產者客戶端由兩個線程協調運行,這兩個線程分別爲主線程和 Sender 線程(發送線程)。
在主線程中由 KafkaProducer 建立消息,而後經過可能的攔截器、序列化器和分區器的做用以後緩存到消息累加器(RecordAccumulator,也稱爲消息收集器)中。Sender 線程負責從 RecordAccumulator 中獲取消息並將其發送到 Kafka 中。
RecordAccumulator
RecordAccumulator 主要用來緩存消息以便 Sender 線程能夠批量發送,進而減小網絡傳輸的資源消耗以提高性能。
主線程中發送過來的消息都會被追加到 RecordAccumulator 的某個雙端隊列(Deque)中,在 RecordAccumulator 的內部爲每一個分區都維護了一個雙端隊列。
消息寫入緩存時,追加到雙端隊列的尾部;Sender 讀取消息時,從雙端隊列的頭部讀取。
Sender 從 RecordAccumulator 中獲取緩存的消息以後,會進一步將本來<分區, Deque< ProducerBatch>> 的保存形式轉變成 <Node, List< ProducerBatch> 的形式,其中 Node 表示 Kafka 集羣的 broker 節點。
KafkaProducer 要將此消息追加到指定主題的某個分區所對應的 leader 副本以前,首先須要知道主題的分區數量,而後通過計算得出(或者直接指定)目標分區,以後 KafkaProducer 須要知道目標分區的 leader 副本所在的 broker 節點的地址、端口等信息才能創建鏈接,最終才能將消息發送到 Kafka。
因此這裏須要一個轉換,對於網絡鏈接來講,生產者客戶端是與具體的 broker 節點創建的鏈接,也就是向具體的 broker 節點發送消息,而並不關心消息屬於哪個分區。
InFlightRequests
請求在從 Sender 線程發往 Kafka 以前還會保存到 InFlightRequests 中,InFlightRequests 保存對象的具體形式爲 Map<NodeId, Deque>,它的主要做用是緩存了已經發出去但尚未收到響應的請求(NodeId 是一個 String 類型,表示節點的 id 編號)。
生產者攔截器既能夠用來在消息發送前作一些準備工做,好比按照某個規則過濾不符合要求的消息、修改消息的內容等,也能夠用來在發送回調邏輯前作一些定製化的需求,好比統計類工做。
生產者攔截器的使用也很方便,主要是自定義實現 org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3個方法:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); public void onAcknowledgement(RecordMetadata metadata, Exception exception); public void close();
KafkaProducer 在將消息序列化和計算分區以前會調用生產者攔截器的 onSend() 方法來對消息進行相應的定製化操做。通常來講最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息。
KafkaProducer 會在消息被應答(Acknowledgement)以前或消息發送失敗時調用生產者攔截器的 onAcknowledgement() 方法,優先於用戶設定的 Callback 以前執行。這個方法運行在 Producer 的I/O線程中,因此這個方法中實現的代碼邏輯越簡單越好,不然會影響消息的發送速度。
close() 方法主要用於在關閉攔截器時執行一些資源的清理工做。
生產者須要用序列化器(Serializer)把對象轉換成字節數組才能經過網絡發送給 Kafka。而在對側,消費者須要用反序列化器(Deserializer)把從 Kafka 中收到的字節數組轉換成相應的對象。
生產者使用的序列化器和消費者使用的反序列化器是須要一一對應的,若是生產者使用了某種序列化器,好比 StringSerializer,而消費者使用了另外一種序列化器,好比 IntegerSerializer,那麼是沒法解析出想要的數據的。
序列化器都須要實現org.apache.kafka.common.serialization.Serializer 接口,此接口有3個方法:
public void configure(Map<String, ?> configs, boolean isKey) public byte[] serialize(String topic, T data) public void close()
configure() 方法用來配置當前類,serialize() 方法用來執行序列化操做。而 close() 方法用來關閉當前的序列化器。
以下:
public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; @Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("serializer.encoding"); if (encodingValue != null && encodingValue instanceof String) encoding = (String) encodingValue; } @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing " + "string to byte[] due to unsupported encoding " + encoding); } } @Override public void close() { // nothing to do } }
configure() 方法,這個方法是在建立 KafkaProducer 實例的時候調用的,主要用來肯定編碼類型。
serialize用來編解碼,若是 Kafka 客戶端提供的幾種序列化器都沒法知足應用需求,則能夠選擇使用如 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的序列化工具來實現,或者使用自定義類型的序列化器來實現。
消息通過序列化以後就須要肯定它發往的分區,若是消息 ProducerRecord 中指定了 partition 字段,那麼就不須要分區器的做用,由於 partition 表明的就是所要發往的分區號。
若是消息 ProducerRecord 中沒有指定 partition 字段,那麼就須要依賴分區器,根據 key 這個字段來計算 partition 的值。分區器的做用就是爲消息分配分區。
Kafka 中提供的默認分區器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它實現了 org.apache.kafka.clients.producer.Partitioner 接口,這個接口中定義了2個方法,具體以下所示。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); public void close();
其中 partition() 方法用來計算分區號,返回值爲 int 類型。partition() 方法中的參數分別表示主題、鍵、序列化後的鍵、值、序列化後的值,以及集羣的元數據信息,經過這些信息能夠實現功能豐富的分區器。close() 方法在關閉分區器的時候用來回收一些資源。
在默認分區器 DefaultPartitioner 的實現中,close() 是空方法,而在 partition() 方法中定義了主要的分區分配邏輯。若是 key 不爲 null,那麼默認的分區器會對 key 進行哈希,最終根據獲得的哈希值來計算分區號,擁有相同 key 的消息會被寫入同一個分區。若是 key 爲 null,那麼消息將會以輪詢的方式發往主題內的各個可用分區。
自定義的分區器,只需同 DefaultPartitioner 同樣實現 Partitioner 接口便可。因爲每一個分區下的消息處理都是有順序的,咱們能夠利用自定義分區器實如今某一系列的key都發送到一個分區中,從而實現有序消費。
在Kafka的架構中,會有不少客戶端向Broker端發送請求,Kafka 的 Broker 端有個 SocketServer 組件,用來和客戶端創建鏈接,而後經過Acceptor線程來進行請求的分發,因爲Acceptor不涉及具體的邏輯處理,很是得輕量級,所以有很高的吞吐量。
接着Acceptor 線程採用輪詢的方式將入站請求公平地發到全部網絡線程中,網絡線程池默認大小是 3個,表示每臺 Broker 啓動時會建立 3 個網絡線程,專門處理客戶端發送的請求,能夠經過Broker 端參數 num.network.threads來進行修改。
那麼接下來處理網絡線程處理流程以下:
當網絡線程拿到請求後,會將請求放入到一個共享請求隊列中。Broker 端還有個 IO 線程池,負責從該隊列中取出請求,執行真正的處理。若是是 PRODUCE 生產請求,則將消息寫入到底層的磁盤日誌中;若是是 FETCH 請求,則從磁盤或頁緩存中讀取消息。
IO 線程池處中的線程是執行請求邏輯的線程,默認是8,表示每臺 Broker 啓動後自動建立 8 個 IO 線程處理請求,能夠經過Broker 端參數 num.io.threads調整。
Purgatory組件是用來緩存延時請求(Delayed Request)的。好比設置了 acks=all 的 PRODUCE 請求,一旦設置了 acks=all,那麼該請求就必須等待 ISR 中全部副本都接收了消息後才能返回,此時處理該請求的 IO 線程就必須等待其餘 Broker 的寫入結果。
在 Kafka 集羣中會有一個或多個 broker,其中有一個 broker 會被選舉爲控制器(Kafka Controller),它負責管理整個集羣中全部分區和副本的狀態。
Broker 在啓動時,會嘗試去 ZooKeeper 中建立 /controller 節點。Kafka 當前選舉控制器的規則是:第一個成功建立 /controller 節點的 Broker 會被指定爲控制器。
在ZooKeeper中的 /controller_epoch 節點中存放的是一個整型的 controller_epoch 值。controller_epoch 用於記錄控制器發生變動的次數,即記錄當前的控制器是第幾代控制器,咱們也能夠稱之爲「控制器的紀元」。
controller_epoch 的初始值爲1,即集羣中第一個控制器的紀元爲1,當控制器發生變動時,每選出一個新的控制器就將該字段值加1。Kafka 經過 controller_epoch 來保證控制器的惟一性,進而保證相關操做的一致性。
每一個和控制器交互的請求都會攜帶 controller_epoch 這個字段,若是請求的 controller_epoch 值小於內存中的 controller_epoch 值,則認爲這個請求是向已通過期的控制器所發送的請求,那麼這個請求會被認定爲無效的請求。
若是請求的 controller_epoch 值大於內存中的 controller_epoch 值,那麼說明已經有新的控制器當選了。
主題管理(建立、刪除、增長分區)
分區重分配
Preferred 領導者選舉
Preferred 領導者選舉主要是 Kafka 爲了不部分 Broker 負載太重而提供的一種換 Leader 的方案。
集羣成員管理(新增 Broker、Broker 主動關閉、Broker 宕機)
控制器組件會利用 Watch 機制檢查 ZooKeeper 的 /brokers/ids 節點下的子節點數量變動。目前,當有新 Broker 啓動後,它會在 /brokers 下建立專屬的 znode 節點。一旦建立完畢,ZooKeeper 會經過 Watch 機制將消息通知推送給控制器,這樣,控制器就能自動地感知到這個變化,進而開啓後續的新增 Broker 做業。
數據服務
控制器上保存了最全的集羣元數據信息。
當運行中的控制器忽然宕機或意外終止時,Kafka 可以快速地感知到,並當即啓用備用控制器來代替以前失敗的控制器。這個過程就被稱爲 Failover,該過程是自動完成的,無需你手動干預。
在Kafka中,每一個消費者都有一個對應的消費組。當消息發佈到主題後,只會被投遞給訂閱它的每一個消費組中的一個消費者。每一個消費者只能消費所分配到的分區中的消息。而每個分區只能被一個消費組中的一個消費者所消費。
入上圖所示,咱們能夠設置兩個消費者組來實現廣播消息的做用,消費組A和組B均可以接受到生產者發送過來的消息。
消費者與消費組這種模型可讓總體的消費能力具有橫向伸縮性,咱們能夠增長(或減小)消費者的個數來提升(或下降)總體的消費能力。對於分區數固定的狀況,一味地增長消費者並不會讓消費能力一直獲得提高,若是消費者過多,出現了消費者的個數大於分區個數的狀況,就會有消費者分配不到任何分區。
以下:一共有8個消費者,7個分區,那麼最後的消費者C7因爲分配不到任何分區而沒法消費任何消息。
Kafka 提供了消費者客戶端參數 partition.assignment.strategy 來設置消費者與訂閱主題之間的分區分配策略。
RangeAssignor分配策略
默認狀況下,採用 RangeAssignor 分配策略。
RangeAssignor 分配策略的原理是按照消費者總數和分區總數進行整除運算來得到一個跨度,而後將分區按照跨度進行平均分配,以保證分區儘量均勻地分配給全部的消費者。對於每個主題,RangeAssignor 策略會將消費組內全部訂閱這個主題的消費者按照名稱的字典序排序,而後爲每一個消費者劃分固定的分區範圍,若是不夠平均分配,那麼字典序靠前的消費者會被多分配一個分區。
假設消費組內有2個消費者 C0 和 C1,都訂閱了主題 t0 和 t1,而且每一個主題都有4個分區,那麼訂閱的全部分區能夠標識爲:t0p0、t0p一、t0p二、t0p三、t1p0、t1p一、t1p二、t1p3。最終的分配結果爲:
消費者C0:t0p0、t0p一、t1p0、t1p1 消費者C1:t0p二、t0p三、t1p二、t1p3
假設上面例子中2個主題都只有3個分區,那麼訂閱的全部分區能夠標識爲:t0p0、t0p一、t0p二、t1p0、t1p一、t1p2。最終的分配結果爲:
消費者C0:t0p0、t0p一、t1p0、t1p1 消費者C1:t0p二、t1p2
能夠明顯地看到這樣的分配並不均勻。
RoundRobinAssignor分配策略
RoundRobinAssignor 分配策略的原理是將消費組內全部消費者及消費者訂閱的全部主題的分區按照字典序排序,而後經過輪詢方式逐個將分區依次分配給每一個消費者。
若是同一個消費組內全部的消費者的訂閱信息都是相同的,那麼 RoundRobinAssignor 分配策略的分區分配會是均勻的。
若是同一個消費組內的消費者訂閱的信息是不相同的,那麼在執行分區分配的時候就不是徹底的輪詢分配,有可能致使分區分配得不均勻。
假設消費組內有3個消費者(C0、C1 和 C2),t0、t0、t一、t2主題分別有一、二、3個分區,即整個消費組訂閱了 t0p0、t1p0、t1p一、t2p0、t2p一、t2p2 這6個分區。
具體而言,消費者 C0 訂閱的是主題 t0,消費者 C1 訂閱的是主題 t0 和 t1,消費者 C2 訂閱的是主題 t0、t1 和 t2,那麼最終的分配結果爲:
消費者C0:t0p0 消費者C1:t1p0 消費者C2:t1p一、t2p0、t2p一、t2p2
能夠看 到 RoundRobinAssignor 策略也不是十分完美,這樣分配其實並非最優解,由於徹底能夠將分區 t1p1 分配給消費者 C1。
StickyAssignor分配策略
這種分配策略,它主要有兩個目的:
假設消費組內有3個消費者(C0、C1 和 C2),它們都訂閱了4個主題(t0、t一、t二、t3),而且每一個主題有2個分區。也就是說,整個消費組訂閱了 t0p0、t0p一、t1p0、t1p一、t2p0、t2p一、t3p0、t3p1 這8個分區。最終的分配結果以下:
消費者C0:t0p0、t1p一、t3p0 消費者C1:t0p一、t2p0、t3p1 消費者C2:t1p0、t2p1
再假設此時消費者 C1 脫離了消費組,那麼分配結果爲:
消費者C0:t0p0、t1p一、t3p0、t2p0 消費者C2:t1p0、t2p一、t0p一、t3p1
StickyAssignor 分配策略如同其名稱中的「sticky」同樣,讓分配策略具有必定的「黏性」,儘量地讓先後兩次分配相同,進而減小系統資源的損耗及其餘異常狀況的發生。
再均衡是指分區的所屬權從一個消費者轉移到另外一消費者的行爲,它爲消費組具有高可用性和伸縮性提供保障,使咱們能夠既方便又安全地刪除消費組內的消費者或往消費組內添加消費者。
弊端:
Rebalance 發生的時機有三個:
後兩類一般是業務的變更調整所致使的,咱們通常不可控制,咱們主要說說由於組成員數量變化而引起的 Rebalance 該如何避免。
當 Consumer Group 完成 Rebalance 以後,每一個 Consumer 實例都會按期地向 Coordinator 發送心跳請求,代表它還存活着。若是某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認爲該 Consumer 已經「死」了,從而將其從 Group 中移除,而後開啓新一輪 Rebalance。
Consumer端能夠設置session.timeout.ms,默認是10s,表示若是 Coordinator 在 10 秒以內沒有收到 Group 下某 Consumer 實例的心跳,它就會認爲這個 Consumer 實例已經掛了。
Consumer端還能夠設置heartbeat.interval.ms,表示發送心跳請求的頻率。
以及max.poll.interval.ms 參數,它限定了 Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示你的 Consumer 程序若是在 5 分鐘以內沒法消費完 poll 方法返回的消息,那麼 Consumer 會主動發起「離開組」的請求,Coordinator 也會開啓新一輪 Rebalance。
因此知道了上面幾個參數後,咱們就能夠避免如下兩個問題:
重平衡過程是靠消費者端的心跳線程(Heartbeat Thread),通知到其餘消費者實例的。
當協調者決定開啓新一輪重平衡後,它會將「REBALANCE_IN_PROGRESS」封裝進心跳請求的響應中,發還給消費者實例。當消費者實例發現心跳響應中包含了「REBALANCE_IN_PROGRESS」,就能立馬知道重平衡又開始了,這就是重平衡的通知機制。
因此,實際上heartbeat.interval.ms不止是設置了心跳的間隔時間,還能夠控制重平衡通知的頻率。
重平衡一旦開啓,Broker 端的協調者組件就要完成整個重平衡流程,Kafka 設計了一套消費者組狀態機(State Machine)來實現。
Kafka 爲消費者組定義了 5 種狀態,它們分別是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。
狀態機的各個狀態流轉:
當有新成員加入或已有成員退出時,消費者組的狀態從 Stable 直接跳到 PreparingRebalance 狀態,此時,全部現存成員就必須從新申請加入組。當全部成員都退出組後,消費者組狀態變動爲 Empty。Kafka 按期自動刪除過時位移的條件就是,組要處於 Empty 狀態。所以,若是你的消費者組停掉了很長時間(超過 7 天),那麼 Kafka 極可能就把該組的位移數據刪除了。
GroupCoordinator 是 Kafka 服務端中用於管理消費組的組件。協調器最重要的職責就是負責執行消費者再均衡的操做。
在消費者端,重平衡分爲兩個步驟:分別是加入組和等待領導者消費者(Leader Consumer)分配方案。即JoinGroup 請求和 SyncGroup 請求。
加入組
當組內成員加入組時,它會向協調器發送 JoinGroup 請求。在該請求中,每一個成員都要將本身訂閱的主題上報,這樣協調器就能收集到全部成員的訂閱信息。
選擇消費組領導者
一旦收集了所有成員的 JoinGroup 請求後,協調者會從這些成員中選擇一個擔任這個消費者組的領導者。
這裏的領導者是具體的消費者實例,它既不是副本,也不是協調器。領導者消費者的任務是收集全部成員的訂閱信息,而後根據這些信息,制定具體的分區消費分配方案。
選舉分區分配策略
這個分區分配的選舉是根據消費組內的各個消費者投票來決定的。
協調器會收集各個消費者支持的全部分配策略,組成候選集 candidates。每一個消費者從候選集 candidates 中找出第一個自身支持的策略,爲這個策略投上一票。計算候選集中各個策略的選票數,選票數最多的策略即爲當前消費組的分配策略。
若是有消費者並不支持選出的分配策略,那麼就會報出異常 IllegalArgumentException:Member does not support protocol。
發送 SyncGroup 請求
協調器會把消費者組訂閱信息封裝進 JoinGroup 請求的響應體中,而後發給領導者,由領導者統一作出分配方案,而後領導者發送 SyncGroup 請求給協調器。
響應SyncGroup 組內全部的消費者都會發送一個 SyncGroup 請求,只不過不是領導者的請求內容爲空,而後就會接收到一個SyncGroup響應,接受訂閱信息。