最近看了kafka2.4新版本的一些功能特性,不得不說,在kafka2.0之後,kafka自身就比較少推出一些新的feature了,基本都是一些修修補補的東西。卻是kafka connect和kafka stream相關的開發工做作的比較多。可能kafka的野心也不侷限於要當一箇中間件,而是要實現一個流處理系統的生態了。git
此次要介紹的是我以爲比較有意思的兩個特性,一個是kafka支持從follower副本讀取數據,固然這個功能並非爲了提供讀取性能,後面再詳細介紹。另外一個則是新推出的sticky partitioner功能,我猜是從rebalance的StickyAssignor中獲得靈感,發現producer的分區策略也能夠這樣搞,233,這個feature主要做用是提升性能。github
這兩個feature都是kafka2.4.0版本推出的,若是想使用這些新feature,那麼不妨升級下吧~apache
在早先kafka的設計中,爲了使consumer讀取數據可以保持一致,是隻容許consumer讀取leader副本的數據的。即follower replica只是單純地備份數據的做用。那推出follower replica fetch功能的背景是什麼呢?服務器
舉個比較常見的場景,kafka存在多個數據中心,不一樣數據中心存在於不一樣的機房,當其中一個數據中心須要向另外一個數據中心同步數據的時候,因爲只能從leader replica消費數據,那麼它不得不進行跨機房獲取數據,而這些流量帶寬一般是比較昂貴的(尤爲是雲服務器)。即沒法利用本地性來減小昂貴的跨機房流量。性能
因此kafka推出這一個功能,就是幫助相似這種場景,節約流量資源。而且這種功能彷佛還能夠和新推出的mirror maker2相互配合,實現多個數據源的數據同步,不過我本身還沒測試過。測試
要說follower replica fetch,那就不得不先說rack功能,這個是kafka比較早就推出的功能,是Kafka對機架感知提供了的基本支持,能夠將其用於控制副本的放置,詳細內容能夠參閱這篇Kafka機架感知文章。fetch
使用方式,其實就是一個broker端的參數,broker.rack,這個參數能夠說明當前broker在哪一個機房。設計
舉上面文章中的例子,若是一個數據中心的集羣分佈以下:
code
那麼能夠這樣配置:orm
這樣其實就是至關於給broker打一個標籤,當新建topic,好比新建一個兩個副本 & 兩個分區的topic,kafka至少會自動給rack1或rack2分配所有分區的一個副本。什麼,你說要是建立兩個分區一個副本的topic該怎麼分。。。抱歉,我給不了答案。等你本身實踐而後評論跟我說下答案 =。=
OK,上面介紹的rack功能,咱們就能發現,這個其實跟跨機房讀數據的這種場景是很搭的。在跨機房多數據中心場景中,若是數據中心A,一個副本放在數據中心B的機房中,只要讓數據中心B的consumer可以讀數據中心A的那個replica的數據(follower副本)讀數據,那不就萬事大吉。
社區也是這樣想的,因此就推出了這個功能。讓消費者能夠指定rack id,而後能夠不從消費者讀取數據。要實現這個目的,須要先配置兩個參數:
replica.selector.class
爲了支持這個功能,kafka修改了這部分的接口實現,源碼中新增一個ReplicaSelector
接口,若是用戶有自定義消費策略的需求,也能夠繼承這個接口實現本身的功能。
目前這個接口有兩個實現類,一個是LeaderSelector
,即從leader副本讀數據。另外一個則是RackAwareReplicaSelector
,會去到指定的rack id讀數據。
client.rack
broker.rack
相同,表示去哪一個rack中獲取數據。這個參數只有在上面的replica.selector.class
指定爲RackAwareReplicaSelector
且broekr指定了broker.rack
纔會生效。
這個功能要測試也挺簡單的,能夠直接搭建一個兩個broker的kafka集羣,配置broker.rack,而後使用consumer客戶端指定client.rack發送到非leader的節點查數據就好了。另外,可使用這條命令查看網卡流量信息:
sar -n DEV 1 300
從follower replica讀取數據確定有問題,最可能的問題就是落後節點的問題,從這樣的節點讀取數據會面臨什麼樣的狀況呢?官方給出了幾種場景及解決辦法。先看看這張圖
主要有四種可能出現問題的狀況,咱們分別來看看應該如何解決:
Case 1(uncommitted offset)
這個場景是follower接收到數據但還未committed offset,這個時候,若消費者的offet消費到high watemark到log end offset之間的那段(Case 1黃色那段),會返回空數據,而不是一個錯誤信息。直到這段內容 committed。
case 2(unavailable offset)
這種場景應該發生於慢節點的狀況下,滿節點的broker還未接收到實際數據,但已經跟leader通訊知道有部分數據committed了(case 2黃色部分)。當遇到這種狀況,consumer 消費到時候,會返回 OFFSET_NOT_AVAILABLE 錯誤信息。
case 3(offset too small)
這種狀況可能出如今消費者指定了 offset 的狀況。那麼在指定不一樣auto.offset.reset
的時候有不一樣的狀況。
case 4(offset too large)
遇到這種狀況,會返回一個 broker 會返回一個 OFFSET_OUT_OF_RANGE 的錯誤。
但 OFFSET_OUT_OF_RANGE 遇到這種錯誤的時候也有多種可能,官方給出當 consumer 遇到這種問題的解決思路,
Use the OffsetForLeaderEpoch API to verify the current position with the leader.
kafka producer發送數據並非一個一個消息發送,而是取決於兩個producer端參數。一個是linger.ms
,默認是0ms,當達到這個時間後,kafka producer就會馬上向broker發送數據。另外一個參數是batch.size
,默認是16kb,當產生的消息數達到這個大小後,就會當即向broker發送數據。
按照這個設計,從直觀上思考,確定是但願每次都儘量填滿一個batch再發送到一個分區。但實際決定batch如何造成的一個因素是分區策略(partitioner strategy)。在Kafka2.4版本以前,在producer發送數據默認的分區策略是輪詢策略(沒指定keyd的狀況),這在我之前的文章有說到過詳細解析kafka之kafka分區和副本。若是多條消息不是被髮送到相同的分區,它們就不能被放入到一個batch中。
因此若是使用默認的輪詢partition策略,可能會形成一個大的batch被輪詢成多個小的batch的狀況。鑑於此,kafka2.4的時候推出一種新的分區策略,即Sticky Partitioning Strategy,Sticky Partitioning Strategy會隨機地選擇另外一個分區並會盡量地堅持使用該分區——即所謂的粘住這個分區。
鑑於小batch可能致使延時增長,以前對於無Key消息的分區策略效率很低。社區於2.4版本引入了黏性分區策略(Sticky Partitioning Strategy)。該策略是一種全新的策略,可以顯著地下降給消息指定分區過程當中的延時。
使用Sticky Partitioner有助於改進消息批處理,減小延遲,並減小broker的負載。
sticky Partitioner實現的代碼是在UniformStickyPartitioner
裏面。貼下代碼看看:
public class UniformStickyPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache(); public void configure(Map<String, ?> configs) {} public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return stickyPartitionCache.partition(topic, cluster); } public void close() {} public void onNewBatch(String topic, Cluster cluster, int prevPartition) { stickyPartitionCache.nextPartition(topic, cluster, prevPartition); } }
咱們主要關注UniformStickyPartitioner#partition()
方法,能夠看到,它是直接經過一個cache類獲取相同的分區,這表示新的record會一直髮送到同一個分區中,除非生成新的batch,觸發了UniformStickyPartitioner#onNewBatch()
方法纔會換分區。
能夠看看RoundRobinPartitioner#partition()
方法(即輪詢分區策略)進行對比,就能發現比較明顯的對比。
這個sticky partitioner最大的好處就是性能較好,按照官方給出的測試結果,使用sticky partitioner測試能夠減小50%的延時,吞吐也有相對應的提升。我本身測了下數據基本出入不大。
另外說明下,在kafka2.4之後,默認的partitioner分區策略,已經包含了sticky partitioner了,因此升級到kafka2.4之後,並不須要任何修改就能享受到性能到極大提高。這裏能夠看下kafka2.4版本的策略說明:
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose the sticky partition that changes when the batch is full. * * See KIP-480 for details about sticky partitioning. */ public class DefaultPartitioner implements Partitioner {
有一點挺奇怪到,在測試過程當中(使用bin/kafka-producer-perf-test.sh測試),發現DefaultPartitioner
的性能要比UniformStickyPartitioner
的性能要好一些,不肯定是什麼緣由,知道到小夥伴能夠在評論區給出答案:)
參考:
KIP-392: Allow consumers to fetch from closest replica
KIP-480: Sticky Partitioner 以上~