kafka的分區分配策略

用過 Kafka 的同窗應該都知道,每一個 Topic 通常會有不少個 partitions。爲了使得咱們可以及時消費消息,咱們也可能會啓動多個 Consumer 去消費,而每一個 Consumer 又會啓動一個或多個streams去分別消費 Topic 對應分區中的數據。咱們又知道,Kafka 存在 Consumer Group 的概念,也就是 group.id 同樣的 Consumer,這些 Consumer 屬於同一個Consumer Group,組內的全部消費者協調在一塊兒來消費訂閱主題(subscribed topics)的全部分區(partition)。固然,每一個分區只能由同一個消費組內的一個consumer來消費。那麼問題來了,同一個 Consumer Group 裏面的 Consumer 是如何知道該消費哪些分區裏面的數據呢?微信

Kafka Partition Assignment Strategy
若是想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共賬號: iteblog_hadoop

如上圖,Consumer1 爲啥消費的是 Partition0 和 Partition2,而不是 Partition0 和 Partition3?這就涉及到 Kafka內部分區分配策略(Partition Assignment Strategy)了。dom

在 Kafka 內部存在兩種默認的分區分配策略:Range 和 RoundRobin。當如下事件發生時,Kafka 將會進行一次分區分配:oop

  • 同一個 Consumer Group 內新增消費者
  • 消費者離開當前所屬的Consumer Group,包括shuts down 或 crashes
  • 訂閱的主題新增分區

將分區的全部權從一個消費者移到另外一個消費者稱爲從新平衡(rebalance),如何rebalance就涉及到本文提到的分區分配策略。下面咱們將詳細介紹 Kafka 內置的兩種分區分配策略。本文假設咱們有個名爲 T1 的主題,其包含了10個分區,而後咱們有兩個消費者(C1,C2)來消費這10個分區裏面的數據,並且 C1 的 num.streams = 1,C2 的 num.streams = 2。post

Range strategy

Range策略是對每一個主題而言的,首先對同一個主題裏面的分區按照序號進行排序,並對消費者按照字母順序進行排序。在咱們的例子裏面,排完序的分區將會是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序將會是C1-0, C2-0, C2-1。而後將partitions的個數除於消費者線程的總數來決定每一個消費者線程消費幾個分區。若是除不盡,那麼前面幾個消費者線程將會多消費一個分區。在咱們的例子裏面,咱們有10個分區,3個消費者線程, 10 / 3 = 3,並且除不盡,那麼消費者線程 C1-0 將會多消費一個分區,因此最後分區分配的結果看起來是這樣的:spa

C1-0 將消費 0, 1, 2, 3 分區
C2-0 將消費 4, 5, 6 分區
C2-1 將消費 7, 8, 9 分區線程

假如咱們有11個分區,那麼最後分區分配的結果看起來是這樣的:scala

C1-0 將消費 0, 1, 2, 3 分區
C2-0 將消費 4, 5, 6, 7 分區
C2-1 將消費 8, 9, 10 分區code

假如咱們有2個主題(T1和T2),分別有10個分區,那麼最後分區分配的結果看起來是這樣的:orm

C1-0 將消費 T1主題的 0, 1, 2, 3 分區以及 T2主題的 0, 1, 2, 3分區
C2-0 將消費 T1主題的 4, 5, 6 分區以及 T2主題的 4, 5, 6分區
C2-1 將消費 T1主題的 7, 8, 9 分區以及 T2主題的 7, 8, 9分區blog

能夠看出,C1-0 消費者線程比其餘消費者線程多消費了2個分區,這就是Range strategy的一個很明顯的弊端。

RoundRobin strategy

使用RoundRobin策略有兩個前提條件必須知足:

  • 同一個Consumer Group裏面的全部消費者的num.streams必須相等;
  • 每一個消費者訂閱的主題必須相同。

因此這裏假設前面提到的2個消費者的num.streams = 2。RoundRobin策略的工做原理:將全部主題的分區組成 TopicAndPartition 列表,而後對 TopicAndPartition 列表按照 hashCode 進行排序,這裏文字可能說不清,看下面的代碼應該會明白:(其實就是按分區名hash排序後平均分配給每個消費者的線程)

val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, partitions) = >
   info( "Consumer %s rebalancing the following partitions for topic %s: %s"
        .format(ctx.consumerId, topic, partitions))
   partitions.map(partition = > {
     TopicAndPartition(topic, partition)
   })
}.toSeq.sortWith((topicPartition 1 , topicPartition 2 ) = > {
   /*
    * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
    * up on one consumer (if it has a high enough stream count).
    */
   topicPartition 1 .toString.hashCode < topicPartition 2 .toString.hashCode
})

最後按照round-robin風格將分區分別分配給不一樣的消費者線程。

在咱們的例子裏面,假如按照 hashCode 排序完的topic-partitions組依次爲T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,咱們的消費者線程排序爲C1-0, C1-1, C2-0, C2-1,最後分區分配的結果爲:

C1-0 將消費 T1-5, T1-2, T1-6 分區;
C1-1 將消費 T1-3, T1-1, T1-9 分區;
C2-0 將消費 T1-0, T1-4 分區;
C2-1 將消費 T1-8, T1-7 分區;

多個主題的分區分配和單個主題相似,這裏就不在介紹了。

根據上面的詳細介紹相信你們已經對Kafka的分區分配策略原理很清楚了。不過遺憾的是,目前咱們還不能自定義分區分配策略,只能經過partition.assignment.strategy參數選擇 range 或 roundrobin。partition.assignment.strategy參數默認的值是range。

相關文章
相關標籤/搜索