咱們知道Kafka 的消息經過topic進行分類。topic能夠被分爲若干個partition來存儲消息。消息以追加的方式寫入partition,而後以先入先出的順序讀取。java
下面是topic和partition的關係圖:spa
咱們通常會在server.conf中經過num.partitions參數指定建立topic時包含多少個partition。默認是num.partitions=1。3d
既然一個topic有多個partition,那麼消息是怎麼樣分配到partition的呢?代理
生產者生產一個消息send到topic分區器,分區器會根據消息裏面的分區參數key值把消息分到對應的partition。這裏就像咱們快遞代髮網點同樣,快遞代髮網點能夠代理不少種快遞公司,若是要寄快遞者P(生產者)指定用什麼快遞公司,代髮網點人員C(分區器)就會把該物品M(消息)歸類到指定的快遞公司區域存放。若是P不要求具體的快遞公司寄件,那麼就由C隨意分配快遞公司(哈哈,那就要看這個傢伙的心情了,心情好點給你一個順豐比較快到達,心情很差時就GG吧)。code
下面是Kafka對消息分配分區 DefaultPartitioner.java 類的核心代碼:server
1 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 2 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 3 int numPartitions = partitions.size(); 4 if (keyBytes == null) { 5 int nextValue = counter.getAndIncrement(); 6 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); 7 if (availablePartitions.size() > 0) { 8 int part = Utils.toPositive(nextValue) % availablePartitions.size(); 9 return availablePartitions.get(part).partition(); 10 } else { 11 // no partitions are available, give a non-available partition 12 return Utils.toPositive(nextValue) % numPartitions; 13 } 14 } else { 15 // hash the keyBytes to choose a partition 16 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 17 } 18 }
第四、7行:若是沒有指定key值而且可用分區個數大於0時,在就可用分區中作輪詢決定改消息分配到哪一個partition。blog
第四、10行:若是沒有指定key值而且沒有可用分區時,在全部分區中輪詢決定改消息分配到哪一個partition。rem
第14行:若是指定key值,對key作hash分配到指定的partition。get
因此當同一個key的消息會被分配到同一個partition中。消息在同一個partition處理的順序是FIFO,這就保證了消息的順序性。hash