kafka topic消息分配partition規則(Java源碼)

咱們知道Kafka 的消息經過topic進行分類。topic能夠被分爲若干個partition來存儲消息。消息以追加的方式寫入partition,而後以先入先出的順序讀取。java

下面是topic和partition的關係圖:spa

咱們通常會在server.conf中經過num.partitions參數指定建立topic時包含多少個partition。默認是num.partitions=1。3d

既然一個topic有多個partition,那麼消息是怎麼樣分配到partition的呢?代理

生產者生產一個消息send到topic分區器,分區器會根據消息裏面的分區參數key值把消息分到對應的partition。這裏就像咱們快遞代髮網點同樣,快遞代髮網點能夠代理不少種快遞公司,若是要寄快遞者P(生產者)指定用什麼快遞公司,代髮網點人員C(分區器)就會把該物品M(消息)歸類到指定的快遞公司區域存放。若是P不要求具體的快遞公司寄件,那麼就由C隨意分配快遞公司(哈哈,那就要看這個傢伙的心情了,心情好點給你一個順豐比較快到達,心情很差時就GG吧)。code

下面是Kafka對消息分配分區 DefaultPartitioner.java 類的核心代碼:server

 1     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {  2         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);  3         int numPartitions = partitions.size();  4         if (keyBytes == null) {  5             int nextValue = counter.getAndIncrement();  6             List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);  7             if (availablePartitions.size() > 0) {  8                 int part = Utils.toPositive(nextValue) % availablePartitions.size();  9                 return availablePartitions.get(part).partition(); 10             } else { 11                 // no partitions are available, give a non-available partition
12                 return Utils.toPositive(nextValue) % numPartitions; 13  } 14         } else { 15             // hash the keyBytes to choose a partition
16             return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 17  } 18     }

第四、7行:若是沒有指定key值而且可用分區個數大於0時,在就可用分區中作輪詢決定改消息分配到哪一個partition。blog

第四、10行:若是沒有指定key值而且沒有可用分區時,在全部分區中輪詢決定改消息分配到哪一個partition。rem

第14行:若是指定key值,對key作hash分配到指定的partition。get

因此當同一個key的消息會被分配到同一個partition中。消息在同一個partition處理的順序是FIFO,這就保證了消息的順序性。hash

相關文章
相關標籤/搜索