如何肯定Kafka的分區數、key和consumer線程數

【原創】如何肯定Kafka的分區數、key和consumer線程數

 
在Kafak中國社區的qq羣中,這個問題被說起的比例是至關高的,這也是Kafka用戶最常碰到的問題之一。本文結合Kafka源碼試圖對該問題相關的因素進行探討。但願對你們有所幫助。
 
怎麼肯定分區數?
    「我應該選擇幾個分區?」——若是你在Kafka中國社區的羣裏,這樣的問題你會常常碰到的。不過有些遺憾的是,咱們彷佛並無很權威的答案可以解答這樣的問題。其實這也不奇怪,畢竟這樣的問題一般都是沒有固定答案的。Kafka官網上標榜本身是"high-throughput distributed messaging system",即一個高吞吐量的分佈式消息引擎。那麼怎麼達到高吞吐量呢?Kafka在底層摒棄了Java堆緩存機制,採用了操做系統級別的頁緩存,同時將隨機寫操做改成順序寫,再結合Zero-Copy的特性極大地改善了IO性能。可是,這只是一個方面,畢竟單機優化的能力是有上限的。如何經過水平擴展甚至是線性擴展來進一步提高吞吐量呢? Kafka就是使用了分區(partition),經過將topic的消息打散到多個分區並分佈保存在不一樣的broker上實現了消息處理(不論是producer仍是consumer)的高吞吐量。
    Kafka的生產者和消費者均可以多線程地並行操做,而每一個線程處理的是一個分區的數據。所以分區其實是調優Kafka並行度的最小單元。對於producer而言,它其實是用多個線程併發地向不一樣分區所在的broker發起Socket鏈接同時給這些分區發送消息;而consumer呢,同一個消費組內的全部consumer線程都被指定topic的某一個分區進行消費(具體如何肯定consumer線程數目咱們後面會詳細說明)。因此說,若是一個topic分區越多,理論上整個集羣所能達到的吞吐量就越大。
    但分區是否越多越好呢?顯然也不是,由於每一個分區都有本身的開銷:
1、客戶端/服務器端須要使用的內存就越多
    先說說客戶端的狀況。Kafka 0.8.2以後推出了Java版的全新的producer,這個producer有個參數batch.size,默認是16KB。它會爲每一個分區緩存消息,一旦滿了就打包將消息批量發出。看上去這是個可以提高性能的設計。不過很顯然,由於這個參數是分區級別的,若是分區數越多,這部分緩存所需的內存佔用也會更多。假設你有10000個分區,按照默認設置,這部分緩存須要佔用約157MB的內存。而consumer端呢?咱們拋開獲取數據所需的內存不說,只說線程的開銷。若是仍是假設有10000個分區,同時consumer線程數要匹配分區數(大部分狀況下是最佳的消費吞吐量配置)的話,那麼在consumer client就要建立10000個線程,也須要建立大約10000個Socket去獲取分區數據。這裏面的線程切換的開銷自己已經不容小覷了。
    服務器端的開銷也不小,若是閱讀Kafka源碼的話能夠發現,服務器端的不少組件都在內存中維護了分區級別的緩存,好比controller,FetcherManager等,所以分區數越多,這種緩存的成本越久越大。
2、文件句柄的開銷
    每一個分區在底層文件系統都有屬於本身的一個目錄。該目錄下一般會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會爲每一個broker都保存這兩個文件句柄(file handler)。很明顯,若是分區數越多,所須要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit -n的限制。
3、下降高可用性
    Kafka經過副本(replica)機制來保證高可用。具體作法就是爲每一個分區保存若干個副本(replica_factor指定副本數)。每一個副本保存在不一樣的broker上。期中的一個副本充當leader 副本,負責處理producer和consumer請求。其餘副本充當follower角色,由Kafka controller負責保證與leader的同步。若是leader所在的broker掛掉了,contorller會檢測到而後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分狀況下可能只是幾毫秒級別。但若是你有10000個分區,10個broker,也就是說平均每一個broker上有1000個分區。此時這個broker掛掉了,那麼zookeeper和controller須要當即對這1000個分區進行leader選舉。比起不多的分區leader選舉而言,這必然要花更長的時間,而且一般不是線性累加的。若是這個broker還同時是controller狀況就更糟了。
  說了這麼多「廢話」,不少人確定已經不耐煩了。那你說到底要怎麼肯定分區數呢?答案就是:視狀況而定。基本上你仍是須要經過一系列實驗和測試來肯定。固然測試的依據應該是吞吐量。雖然LinkedIn這篇文章作了Kafka的基準測試,但它的結果其實對你意義不大,由於不一樣的硬件、軟件、負載狀況測試出來的結果必然不同。我常常碰到的問題相似於,官網說每秒能到10MB,爲何個人producer每秒才1MB? —— 且不說硬件條件,最後發現他使用的消息體有1KB,而官網的基準測試是用100B測出來的,所以根本沒有可比性。不過你依然能夠遵循必定的步驟來嘗試肯定分區數:建立一個只有1個分區的topic,而後測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位能夠是MB/s。而後假設總的目標吞吐量是Tt,那麼分區數 =  Tt / max(Tp, Tc)
    Tp表示producer的吞吐量。測試producer一般是很容易的,由於它的邏輯很是簡單,就是直接發送消息到Kafka就行了。Tc表示consumer的吞吐量。測試Tc一般與應用的關係更大, 由於Tc的值取決於你拿到消息以後執行什麼操做,所以Tc的測試一般也要麻煩一些。
    另外,Kafka並不能真正地作到線性擴展(其實任何系統都不能),因此你在規劃你的分區數的時候最好多規劃一下,這樣將來擴展時候也更加方便。
 
消息-分區的分配
默認狀況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions,以下圖所示:
def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
}

  這就保證了相同key的消息必定會被路由到相同的分區。若是你沒有指定key,那麼Kafka是如何肯定這條消息去往哪一個分區的呢?html

複製代碼
if(key == null) {  // 若是沒有指定key
        val id = sendPartitionPerTopicCache.get(topic)  // 先看看Kafka有沒有緩存的現成的分區Id
        id match {
          case Some(partitionId) =>  
            partitionId  // 若是有的話直接使用這個分區Id就行了
          case None => // 若是沒有的話,
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)  //找出全部可用分區的leader所在的broker
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size  // 從中隨機挑一個
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用
            partitionId
        }
      }
複製代碼

  能夠看出,Kafka幾乎就是隨機找一個分區發送無key的消息,而後把這個分區號加入到緩存中以備後面直接使用——固然了,Kafka自己也會清空該緩存(默認每10分鐘或每次請求topic元數據時)算法

如何設定consumer線程數
    我我的的觀點,若是你的分區數是N,那麼最好線程數也保持爲N,這樣一般可以達到最大的吞吐量。超過N的配置只是浪費系統資源,由於多出的線程不會被分配到任何分區。讓咱們來看看具體Kafka是如何分配的。
    topic下的一個分區只能被同一個consumer group下的一個consumer線程來消費,但反之並不成立,即一個consumer線程能夠消費多個分區的數據,好比Kafka提供的ConsoleConsumer,默認就只是一個線程來消費全部分區的數據。——其實ConsoleConsumer可使用通配符的功能實現同時消費多個topic數據,但這和本文無關。
    再討論分配策略以前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用於consumer程序調用實現數據的消費。其底層維護了一個阻塞隊列,因此在沒有新消息到來時,consumer是處於阻塞狀態的,表現出來的狀態就是consumer程序一直在等待新消息的到來。——你固然能夠配置成帶超時的consumer,具體參看參數consumer.timeout.ms的用法。
    下面說說Kafka提供的兩種分配策略: range和roundrobin,由參數partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分區,P0 ~ P9,consumer線程數是3, C0 ~ C2,那麼每一個線程都分配哪些分區呢?
 
C0 消費分區 0, 1, 2, 3
C1 消費分區 4, 5, 6
C2 消費分區 7, 8, 9
  
具體算法就是:
複製代碼
val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每一個consumer至少保證消費的分區數
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 還剩下多少個分區須要單獨分配給開頭的線程們
...
for (consumerThreadId <- consumerThreadIdSet) {   // 對於每個consumer線程
        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)  //算出該線程在全部線程中的位置,介於[0, n-1]
        assert(myConsumerPosition >= 0)
// startPart 就是這個線程要消費的起始分區數
        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是這個線程總共要消費多少個分區
        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
...
}
複製代碼

針對於這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart爲10%3=1,說明每一個線程至少保證3個分區,還剩下1個分區須要單獨分配給開頭的若干個線程。這就是爲何C0消費4個分區,後面的2個線程每一個消費3個分區,具體過程詳見下面的Debug截圖信息:緩存

 ctx.myTopicThreadIds
nPartsPerConsumer = 10 / 3  = 3
nConsumersWithExtraPart = 10 % 3 = 1
第一次:
myConsumerPosition = 1
startPart = 1 * 3 + min(1, 1) = 4 ---也就是從分區4開始讀
nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 讀取3個分區, 即4,5,6
第二次:
myConsumerPosition = 0
startPart = 3 * 0 + min(1, 0) =0  --- 從分區0開始讀
nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 讀取4個分區,即0,1,2,3
第三次:
myConsumerPosition = 2
startPart = 3 * 2 + min(2, 1) = 7 --- 從分區7開始讀
nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 讀取3個分區,即7, 8, 9
至此10個分區都已經分配完畢
 
說到這裏,常常有個需求就是我想讓某個consumer線程消費指定的分區而不消費其餘的分區。坦率來講,目前Kafka並無提供自定義分配策略。作到這點很難,但仔細想想,也許咱們指望Kafka作的事情太多了,畢竟它只是個消息引擎,在Kafka中加入消息消費的邏輯也許並非Kafka該作的事情。
相關文章
相關標籤/搜索