kafka使用分區將topic的消息打散到多個分區分佈保存在不一樣的broker上,實現了producer和consumer消息處理的高吞吐量。Kafka的producer和consumer均可以多線程地並行操做,而每一個線程處理的是一個分區的數據。所以分區其實是調優Kafka並行度的最小單元。對於producer而言,它其實是用多個線程併發地向不一樣分區所在的broker發起Socket鏈接同時給這些分區發送消息;而consumer,同一個消費組內的全部consumer線程都被指定topic的某一個分區進行消費。javascript
因此說,若是一個topic分區越多,理論上整個集羣所能達到的吞吐量就越大。java
分區是否越多越好呢?顯然也不是,由於每一個分區都有本身的開銷:python
1、客戶端/服務器端須要使用的內存就越多 Kafka0.8.2以後,在客戶端producer有個參數batch.size,默認是16KB。它會爲每一個分區緩存消息,一旦滿了就打包將消息批量發出。看上去這是個可以提高性能的設計。不過很顯然,由於這個參數是分區級別的,若是分區數越多,這部分緩存所需的內存佔用也會更多。假設你有10000個分區,按照默認設置,這部分緩存須要佔用約157MB的內存。而consumer端呢?咱們拋開獲取數據所需的內存不說,只說線程的開銷。若是仍是假設有10000個分區,同時consumer線程數要匹配分區數(大部分狀況下是最佳的消費吞吐量配置)的話,那麼在consumer client就要建立10000個線程,也須要建立大約10000個Socket去獲取分區數據。這裏面的線程切換的開銷自己已經不容小覷了。
服務器端的開銷也不小,若是閱讀Kafka源碼的話能夠發現,服務器端的不少組件都在內存中維護了分區級別的緩存,好比controller,FetcherManager等,所以分區數越多,這種緩存的成本就越大。
2、文件句柄的開銷 每一個分區在底層文件系統都有屬於本身的一個目錄。該目錄下一般會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會爲每一個broker都保存這兩個文件句柄(file handler)。很明顯,若是分區數越多,所須要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit -n的限制。
3、下降高可用性 Kafka經過副本(replica)機制來保證高可用。具體作法就是爲每一個分區保存若干個副本(replica_factor指定副本數)。每一個副本保存在不一樣的broker上。期中的一個副本充當leader 副本,負責處理producer和consumer請求。其餘副本充當follower角色,由Kafka controller負責保證與leader的同步。若是leader所在的broker掛掉了,contorller會檢測到而後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分狀況下可能只是幾毫秒級別。但若是你有10000個分區,10個broker,也就是說平均每一個broker上有1000個分區。此時這個broker掛掉了,那麼zookeeper和controller須要當即對這1000個分區進行leader選舉。比起不多的分區leader選舉而言,這必然要花更長的時間,而且一般不是線性累加的。若是這個broker還同時是controller狀況就更糟了。緩存
能夠遵循必定的步驟來嘗試肯定分區數:建立一個只有1個分區的topic,而後測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位能夠是MB/s。而後假設總的目標吞吐量是Tt,那麼分區數 = Tt / max(Tp, Tc)服務器
說明:Tp表示producer的吞吐量。測試producer一般是很容易的,由於它的邏輯很是簡單,就是直接發送消息到Kafka就行了。Tc表示consumer的吞吐量。測試Tc一般與應用的關係更大, 由於Tc的值取決於你拿到消息以後執行什麼操做,所以Tc的測試一般也要麻煩一些。多線程
默認狀況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions:併發
def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions }
這保證了相同key的消息必定會被路由到相同的分區。dom
若是你沒有指定key,那麼Kafka是如何肯定這條消息去往哪一個分區的呢?性能
if(key == null) { // 若是沒有指定key val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有緩存的現成的分區Id id match { case Some(partitionId) => partitionId // 若是有的話直接使用這個分區Id就行了 case None => // 若是沒有的話, val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出全部可用分區的leader所在的broker if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中隨機挑一個 val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用 partitionId } }
不指定key時,Kafka幾乎就是隨機找一個分區發送無key的消息,而後把這個分區號加入到緩存中以備後面直接使用——固然了,Kafka自己也會清空該緩存(默認每10分鐘或每次請求topic元數據時)。測試
topic下的一個分區只能被同一個consumer group下的一個consumer線程來消費,但反之並不成立,即一個consumer線程能夠消費多個分區的數據,好比Kafka提供的ConsoleConsumer,默認就只是一個線程來消費全部分區的數據。
即分區數決定了同組消費者個數的上限
因此,若是你的分區數是N,那麼最好線程數也保持爲N,這樣一般可以達到最大的吞吐量。超過N的配置只是浪費系統資源,由於多出的線程不會被分配到任何分區。
Kafka提供的兩種分配策略: range和roundrobin,由參數partition.assignment.strategy指定,默認是range策略。
當如下事件發生時,Kafka 將會進行一次分區分配:
將分區的全部權從一個消費者移到另外一個消費者稱爲從新平衡(rebalance),如何rebalance就涉及到本文提到的分區分配策略。
下面咱們將詳細介紹 Kafka 內置的兩種分區分配策略。本文假設咱們有個名爲 T1 的主題,其包含了10個分區,而後咱們有兩個消費者(C1,C2)
來消費這10個分區裏面的數據,並且 C1 的 num.streams = 1,C2 的 num.streams = 2。
Range策略是對每一個主題而言的,首先對同一個主題裏面的分區按照序號進行排序,並對消費者按照字母順序進行排序。在咱們的例子裏面,排完序的分區將會是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者線程排完序將會是C1-0, C2-0, C2-1。而後將partitions的個數除於消費者線程的總數來決定每一個消費者線程消費幾個分區。若是除不盡,那麼前面幾個消費者線程將會多消費一個分區。在咱們的例子裏面,咱們有10個分區,3個消費者線程, 10 / 3 = 3,並且除不盡,那麼消費者線程 C1-0 將會多消費一個分區,因此最後分區分配的結果看起來是這樣的:
假如咱們有11個分區,那麼最後分區分配的結果看起來是這樣的:
假如咱們有2個主題(T1和T2),分別有10個分區,那麼最後分區分配的結果看起來是這樣的:
能夠看出,C1-0 消費者線程比其餘消費者線程多消費了2個分區,這就是Range strategy的一個很明顯的弊端。
使用RoundRobin策略有兩個前提條件必須知足:
因此這裏假設前面提到的2個消費者的num.streams = 2。RoundRobin策略的工做原理:將全部主題的分區組成 TopicAndPartition 列表,而後對 TopicAndPartition 列表按照 hashCode 進行排序,看下面的代碼應該會明白:
val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => info("Consumer %s rebalancing the following partitions for topic %s: %s" .format(ctx.consumerId, topic, partitions)) partitions.map(partition => { TopicAndPartition(topic, partition) }) }.toSeq.sortWith((topicPartition1, topicPartition2) => { /* * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending * up on one consumer (if it has a high enough stream count). */ topicPartition1.toString.hashCode < topicPartition2.toString.hashCode })
最後按照round-robin風格將分區分別分配給不一樣的消費者線程。
在這個的例子裏面,假如按照 hashCode 排序完的topic-partitions組依次爲T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,咱們的消費者線程排序爲C1-0, C1-1, C2-0, C2-1,最後分區分配的結果爲:
多個主題的分區分配和單個主題相似。遺憾的是,目前咱們還不能自定義分區分配策略,只能經過partition.assignment.strategy參數選擇 range 或 roundrobin。