kafka Topic 與 Partition

Topic在邏輯上能夠被認爲是一個queue隊列,每條消息都必須指定它的topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲 了使得Kafka的吞吐率能夠水平擴展,物理上把topic分紅一個或多個partition,每一個partition在物理上對應一個文件夾,該文件夾 下存儲這個partition的全部消息和索引文件。
  
  每一個日誌文件都是「log entries」序列,每個log entry包含一個4字節整型數(值爲N),其後跟N個字節的消息體。每條消息都有一個當前partition下惟一的64字節的offset,它指明瞭這條消息的起始位置。磁盤上存儲的消費格式以下:
  message length : 4 bytes (value: 1+4+n)
  「magic」 value : 1 byte
  crc : 4 bytes
  payload : n bytes
  這個「log entries」並不是由一個文件構成,而是分紅多個segment,每一個segment名爲該segment第一條消息的offset和「.kafka」組成。另外會有一個索引文件,它標明瞭每一個segment下包含的log entry的offset範圍,
  由於每條消息都被append到該partition中,是順序寫磁盤,所以效率很是高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。
  
  每一條消息被髮送到broker時,會根據paritition規則選擇被存儲到哪個partition。若是partition規則設置的合理, 全部消息能夠均勻分佈到不一樣的partition裏,這樣就實現了水平擴展。(若是一個topic對應一個文件,那這個文件所在的機器I/O將會成爲這個 topic的性能瓶頸,而partition解決了這個問題)。在建立topic時能夠在$KAFKA_HOME/config/server.properties中指定這個partition的數量(以下所示),固然也能夠在topic建立以後去修改parition數量。java

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

  在發送一條消息時,能夠指定這條消息的key,producer根據這個key和partition機制來判斷將這條消息發送到哪一個 parition。paritition機制能夠經過指定producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中若是key能夠被解析爲整數則將對應的整數與partition總數取餘,該消息會被髮送到該數對應的partition。(每一個parition都會有個序號)app

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class JasonPartitioner<T> implements Partitioner {

    public JasonPartitioner(VerifiableProperties verifiableProperties) {}

    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}

 
  若是將上例中的class做爲partition.class,並經過以下代碼發送20條消息(key分別爲0,1,2,3)至topic2(包含4個partition)。  ide

public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
        List messageList = new ArrayList<KeyedMessage<String, String>>();
        for(int j = 0; j < 4; j++){
            messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
        }
        producer.send(messageList);
    }
  producer.close();
}

 

  則key相同的消息會被髮送並存儲到同一個partition裏,並且key的序號正好和partition序號相同。(partition序號從0開始,本例中的key也正好從0開始)。以下圖所示。
  
  對於傳統的message queue而言,通常會刪除已經被消費的消息,而Kafka集羣會保留全部的消息,不管其被消費與否。固然,由於磁盤限制,不可能永久保留全部數據(實際 上也不必),所以Kafka提供兩種策略去刪除舊數據。一是基於時間,二是基於partition文件大小。例如能夠經過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一週前的數據,也可經過配置讓Kafka在partition文件超過1GB時刪除舊數據,以下所示。性能

  ############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# By default the log cleaner is disabled and the log retention policy will default to 
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs 
#can then be marked for log compaction.
log.cleaner.enable=false

  

  這裏要注意,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除文件與Kafka性能無關,選擇怎樣的刪除策 略只與磁盤以及具體的需求有關。另外,Kafka會爲每個consumer group保留一些metadata信息—當前消費的消息的position,也即offset。這個offset由consumer控制。正常狀況下 consumer會在消費完一條消息後線性增長這個offset。固然,consumer也可將offset設成一個較小的值,從新消費一些消息。由於 offet由consumer控制,因此Kafka broker是無狀態的,它不須要標記哪些消息被哪些consumer過,不須要經過broker去保證同一個consumer group只有一個consumer能消費某一條消息,所以也就不須要鎖機制,這也爲Kafka的高吞吐率提供了有力保障。this

相關文章
相關標籤/搜索