什麼,kafka可以從follower副本讀數據了 —kafka新功能介紹

最近看了kafka2.4新版本的一些功能特性,不得不說,在kafka2.0之後,kafka自身就比較少推出一些新的feature了,基本都是一些修修補補的東西。卻是kafka connect和kafka stream相關的開發工做作的比較多。可能kafka的野心也不侷限於要當一箇中間件,而是要實現一個流處理系統的生態了。git

此次要介紹的是我以爲比較有意思的兩個特性,一個是kafka支持從follower副本讀取數據,固然這個功能並非爲了提供讀取性能,後面再詳細介紹。另外一個則是新推出的sticky partitioner功能,我猜是從rebalance的StickyAssignor中獲得靈感,發現producer的分區策略也能夠這樣搞,233,這個feature主要做用是提升性能。github

這兩個feature都是kafka2.4.0版本推出的,若是想使用這些新feature,那麼不妨升級下吧~apache

follower副本讀取數據(consumer fetch from closest replica)

背景

在早先kafka的設計中,爲了使consumer讀取數據可以保持一致,是隻容許consumer讀取leader副本的數據的。即follower replica只是單純地備份數據的做用。那推出follower replica fetch功能的背景是什麼呢?服務器

舉個比較常見的場景,kafka存在多個數據中心,不一樣數據中心存在於不一樣的機房,當其中一個數據中心須要向另外一個數據中心同步數據的時候,因爲只能從leader replica消費數據,那麼它不得不進行跨機房獲取數據,而這些流量帶寬一般是比較昂貴的(尤爲是雲服務器)。即沒法利用本地性來減小昂貴的跨機房流量。性能

因此kafka推出這一個功能,就是幫助相似這種場景,節約流量資源。而且這種功能彷佛還能夠和新推出的mirror maker2相互配合,實現多個數據源的數據同步,不過我本身還沒測試過。測試

rack功能介紹

要說follower replica fetch,那就不得不先說rack功能,這個是kafka比較早就推出的功能,是Kafka對機架感知提供了的基本支持,能夠將其用於控制副本的放置,詳細內容能夠參閱這篇Kafka機架感知文章。fetch

使用方式,其實就是一個broker端的參數,broker.rack,這個參數能夠說明當前broker在哪一個機房。設計

舉上面文章中的例子,若是一個數據中心的集羣分佈以下:
kafka-rack功能code

那麼能夠這樣配置:orm

  • broker0 -> rack1
  • broker1 -> rack1
  • broker2 -> rack2
  • broker3 -> rack2

這樣其實就是至關於給broker打一個標籤,當新建topic,好比新建一個兩個副本 & 兩個分區的topic,kafka至少會自動給rack1或rack2分配所有分區的一個副本。什麼,你說要是建立兩個分區一個副本的topic該怎麼分。。。抱歉,我給不了答案。等你本身實踐而後評論跟我說下答案 =。=

replica fetch功能測試

OK,上面介紹的rack功能,咱們就能發現,這個其實跟跨機房讀數據的這種場景是很搭的。在跨機房多數據中心場景中,若是數據中心A,一個副本放在數據中心B的機房中,只要讓數據中心B的consumer可以讀數據中心A的那個replica的數據(follower副本)讀數據,那不就萬事大吉。

社區也是這樣想的,因此就推出了這個功能。讓消費者能夠指定rack id,而後能夠不從消費者讀取數據。要實現這個目的,須要先配置兩個參數:

replica.selector.class

  • broker端配置
  • 配置名:replica.selector.class
  • 解釋:ReplicaSelector實現類的全名,包括路徑 (好比 RackAwareReplicaSelector 即按 rack id 指定消費)
  • 默認:從 Leader 消費的 LeaderSelector

爲了支持這個功能,kafka修改了這部分的接口實現,源碼中新增一個ReplicaSelector接口,若是用戶有自定義消費策略的需求,也能夠繼承這個接口實現本身的功能。

目前這個接口有兩個實現類,一個是LeaderSelector,即從leader副本讀數據。另外一個則是RackAwareReplicaSelector,會去到指定的rack id讀數據。

client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解釋:這個參數須要和broker端指定的broker.rack相同,表示去哪一個rack中獲取數據。
  • 默認:null

這個參數只有在上面的replica.selector.class指定爲RackAwareReplicaSelector且broekr指定了broker.rack纔會生效。

這個功能要測試也挺簡單的,能夠直接搭建一個兩個broker的kafka集羣,配置broker.rack,而後使用consumer客戶端指定client.rack發送到非leader的節點查數據就好了。另外,可使用這條命令查看網卡流量信息:

sar -n DEV 1 300

存在問題

從follower replica讀取數據確定有問題,最可能的問題就是落後節點的問題,從這樣的節點讀取數據會面臨什麼樣的狀況呢?官方給出了幾種場景及解決辦法。先看看這張圖
img

主要有四種可能出現問題的狀況,咱們分別來看看應該如何解決:

Case 1(uncommitted offset)

這個場景是follower接收到數據但還未committed offset,這個時候,若消費者的offet消費到high watemark到log end offset之間的那段(Case 1黃色那段),會返回空數據,而不是一個錯誤信息。直到這段內容 committed。

case 2(unavailable offset)

這種場景應該發生於慢節點的狀況下,滿節點的broker還未接收到實際數據,但已經跟leader通訊知道有部分數據committed了(case 2黃色部分)。當遇到這種狀況,consumer 消費到時候,會返回 OFFSET_NOT_AVAILABLE 錯誤信息。

case 3(offset too small)

這種狀況可能出如今消費者指定了 offset 的狀況。那麼在指定不一樣auto.offset.reset的時候有不一樣的狀況。

  1. If the reset policy is "earliest," fetch the log start offset of the current replica that raised the out of range error.
  2. If the reset policy is "latest," fetch the log end offset from the leader.
  3. If the reset policy is "none," raise an exception.

case 4(offset too large)

遇到這種狀況,會返回一個 broker 會返回一個 OFFSET_OUT_OF_RANGE 的錯誤。

但 OFFSET_OUT_OF_RANGE 遇到這種錯誤的時候也有多種可能,官方給出當 consumer 遇到這種問題的解決思路,

Use the OffsetForLeaderEpoch API to verify the current position with the leader.

  1. If the fetch offset is still valid, refresh metadata and continue fetching
  2. If truncation was detected, follow the steps in KIP-320 to either reset the offset or raise the truncation error
  3. Otherwise, follow the same steps above as in case 3.

sticky partitioner功能

背景

kafka producer發送數據並非一個一個消息發送,而是取決於兩個producer端參數。一個是linger.ms,默認是0ms,當達到這個時間後,kafka producer就會馬上向broker發送數據。另外一個參數是batch.size,默認是16kb,當產生的消息數達到這個大小後,就會當即向broker發送數據。

按照這個設計,從直觀上思考,確定是但願每次都儘量填滿一個batch再發送到一個分區。但實際決定batch如何造成的一個因素是分區策略(partitioner strategy)。在Kafka2.4版本以前,在producer發送數據默認的分區策略是輪詢策略(沒指定keyd的狀況),這在我之前的文章有說到過詳細解析kafka之kafka分區和副本。若是多條消息不是被髮送到相同的分區,它們就不能被放入到一個batch中。

因此若是使用默認的輪詢partition策略,可能會形成一個大的batch被輪詢成多個小的batch的狀況。鑑於此,kafka2.4的時候推出一種新的分區策略,即Sticky Partitioning Strategy,Sticky Partitioning Strategy會隨機地選擇另外一個分區並會盡量地堅持使用該分區——即所謂的粘住這個分區。

鑑於小batch可能致使延時增長,以前對於無Key消息的分區策略效率很低。社區於2.4版本引入了黏性分區策略(Sticky Partitioning Strategy)。該策略是一種全新的策略,可以顯著地下降給消息指定分區過程當中的延時。

使用Sticky Partitioner有助於改進消息批處理,減小延遲,並減小broker的負載。

功能解析

sticky Partitioner實現的代碼是在UniformStickyPartitioner裏面。貼下代碼看看:

public class UniformStickyPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return stickyPartitionCache.partition(topic, cluster);
    }

    public void close() {}

    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

咱們主要關注UniformStickyPartitioner#partition()方法,能夠看到,它是直接經過一個cache類獲取相同的分區,這表示新的record會一直髮送到同一個分區中,除非生成新的batch,觸發了UniformStickyPartitioner#onNewBatch()方法纔會換分區。

能夠看看RoundRobinPartitioner#partition()方法(即輪詢分區策略)進行對比,就能發現比較明顯的對比。

這個sticky partitioner最大的好處就是性能較好,按照官方給出的測試結果,使用sticky partitioner測試能夠減小50%的延時,吞吐也有相對應的提升。我本身測了下數據基本出入不大。

另外說明下,在kafka2.4之後,默認的partitioner分區策略,已經包含了sticky partitioner了,因此升級到kafka2.4之後,並不須要任何修改就能享受到性能到極大提高。這裏能夠看下kafka2.4版本的策略說明:

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
 * 
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {

有一點挺奇怪到,在測試過程當中(使用bin/kafka-producer-perf-test.sh測試),發現DefaultPartitioner的性能要比UniformStickyPartitioner的性能要好一些,不肯定是什麼緣由,知道到小夥伴能夠在評論區給出答案:)

參考:
KIP-392: Allow consumers to fetch from closest replica
KIP-480: Sticky Partitioner 以上~

相關文章
相關標籤/搜索