Kafka的消息格式

Commit Log

Kafka儲存消息的文件被它叫作log,按照Kafka文檔的說法是:java

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit logapache

這反應出來的Kafka的行爲是:消息被不斷地append到文件末尾,並且消息是不可變的。數組

這種行爲源於Kafka想要實現的功能:高吞吐量,多副本,消息持久化。這種簡單的log形式的文件結構可以更好地實現這些功能,不過也會在其它方面有所欠缺,好比檢索消息的能力。緩存

而Kafka的行爲也決定了它的消息格式。對於Kafka來講,消息的主體部分的格式在網絡傳輸中和磁盤上是一致的,也就是說消息的主體部分能夠直接從網絡讀取的字節buffer中寫入到文件(部分狀況下),也能夠直接從文件中copy到網絡,而不須要在程序中再加工,這有利於下降服務器端的開銷,以及提升IO速度(好比使用zero-copy的傳輸)。服務器

這也就決定了Kafka的消息格式必須是適於被直接append到文件中的。固然啥均可以append到文件後面,問題在於怎麼從文件中拆分出來一條條記錄。網絡

記錄的劃分以及消息的格式

對於日誌來講,一條記錄以"\n"結尾,或者經過其它特定的分隔符分隔,這樣就能夠從文件中拆分出一條一條的記錄,不過這種格式更適用於文本,對於Kafka來講,須要的是二進制的格式。因此,Kafka使用了另外一種經典的格式:在消息前面固定長度的幾個字節記錄下這條消息的大小(以byte記),因此Kafka的記錄格式變成了:app

Offset MessageSize Messageide

消息被以這樣格式append到文件裏,在讀的時候經過MessageSize能夠肯定一條消息的邊界。性能

須要注意的是,在Kafka的文檔以及源碼中,消息(Message)並不包括它的offset。Kafka的log是由一條一條的記錄構成的,Kafka並無給這種記錄起個專門的名字,可是須要記住的是這個「記錄」並不等於"Message"。Offset MessageSize Message加在一塊兒,構成一條記錄。而在Kafka Protocol中,Message具體的格式爲fetch

Message => Crc MagicByte Attributes Key Value
   Crc => int32
   MagicByte => int8
   Attributes => int8
   Key => bytes
   Value => bytes

各個部分的含義是

Field

Description

Attributes

This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits should be set to 0.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 0.

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.

 

MessageSet

之因此要強調記錄與Message的區別,是爲了更好地理解MessageSet的概念。Kafka protocol裏對於MessageSet的定義是這樣的

MessageSet => [Offset MessageSize Message]
   Offset => int64
   MessageSize => int32

也就是說MessageSet是由多條記錄組成的,而不是消息,這就決定了一個MessageSet實際上不須要藉助其它信息就能夠從它對應的字節流中切分出消息,而這決定了更重要的性質:Kafka的壓縮是以MessageSet爲單位的。而以MessageSet爲單位壓縮,決定了對於壓縮後的MessageSet,不須要在它的外部記錄這個MessageSet的結構,也就決定了Kafka的消息是能夠遞歸包含的,也就是前邊"value"字段的說明「Kafka supports recursive messages in which case this may itself contain a message set"。

具體地說,對於Kafka來講,能夠對一個MessageSet作爲總體壓縮,把壓縮後獲得的字節數組做爲一條Message的value。因而,Message既能夠表示未壓縮的單條消息,也能夠表示壓縮後的MessageSet。

壓縮後的消息的讀取

就看Message頭部的Attributes裏的壓縮格式標識。說到這個,得說下遞歸包含的事情,理論上,一個壓縮的的MessageSet裏的一個Message可能會是另外一個壓縮後的MessageSet,或者包含更深層的MessageSet。可是實際上,Kafka中的一個Message最多隻含有一個MessageSet。從Message中讀取MessageSet的邏輯,能夠在ByteBufferMessageSet的internalIterator方法中找到:

        if(isShallow) { //是否要進行深層迭代
          new MessageAndOffset(newMessage, offset)
        } else { //若是要深層迭代的話
          newMessage.compressionCodec match {
            case NoCompressionCodec =>
              innerIter = null
              new MessageAndOffset(newMessage, offset) //若是這個Message沒有壓縮,就直接把它做爲一個Message返回
            case _ =>
              innerIter = ByteBufferMessageSet.deepIterator(newMessage) //若是這個Message採用了壓縮,就對它進行深層迭代
              if(!innerIter.hasNext)
                innerIter = null
              makeNext()
          }
        }

而ByteBufferMessageSet的deepIterator方法就是對這個Message的value進行解壓,而後從中按照Offset MessageSize Message的格式讀取一條條記錄,對於此次讀取的Message,就再也不進行深層迭代了。下面是deepIterator的makeNext方法,它被不斷調用以生成迭代器的元素

      override def makeNext(): MessageAndOffset = {
        try {
          // read the offset
          val offset = compressed.readLong()
          // read record size
          val size = compressed.readInt()

          if (size < Message.MinHeaderSize)
            throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")

          // read the record into an intermediate record buffer
          // and hence has to do extra copy
          val bufferArray = new Array[Byte](size)
          compressed.readFully(bufferArray, 0, size)
          val buffer = ByteBuffer.wrap(bufferArray)

          val newMessage = new Message(buffer)

          // the decompressed message should not be a wrapper message since we do not allow nested compression
          new MessageAndOffset(newMessage, offset)
        } catch {
          case eofe: EOFException =>
            compressed.close()
            allDone()
          case ioe: IOException =>
            throw new KafkaException(ioe)
        }
      }

KAFKA-1718

至於一個MessageSet中不能包含多個壓縮後的Message(壓縮後的Message也就是以壓縮後的MessageSet做爲value的Message),Kafka Protocol中是這麼說的

The outer MessageSet should contain only one compressed "Message" (see KAFKA-1718 for details).

KAFKA-1718就是在Protocol裏添加這麼一個特殊說明的緣由。事情是這樣的:

報各這個問題的人是Go語言client的做者,他發現本身發的Message明顯沒有過大,可是發生了MessageSizeTooLargeException。後來跟其它人討論,發現是由於broker端在調用Log.append時,會把傳送給這個方法的MessageSet解壓開,而後再組合成一個壓縮後的MessageSet(ByteBufferMessageSet)。而Go語言的客戶端發送的MessageSet中包含了多個壓縮後的Message,這樣即便發送時的Message不會超過message.max.bytes的限制,可是broker端再次生成的Message就超過了這個限制。因此,Kafka Protocol對這種狀況作了特殊說明:The outer MessageSet should contain only one compressed "Message"。

Compressed Message的offset

即然能夠把壓縮後的MessageSet做爲Message的value,那麼這個Message的offset該如何設置呢?

這個offset的值只有兩種可能:1, 被壓縮的MessageSet裏Message的最大offset; 2, 被壓縮的MessageSet裏Message的最小offset.

這兩種取值沒有功能的不一樣,只有效率的不一樣。

因爲FetchRequest協議中的offset是要求broker提供大於等於這個offset的消息,所以broker會檢查log,找到符合條件的,而後傳輸出去。那麼因爲FetchRequest中的offset位置的消息可位於一個compressed message中,因此broker須要肯定一個compressed Message是否須要被包含在respone中。

  • 若是compressed Message的offset是它包含的MessageSet的最小offset。那麼,咱們對於這個Message是否應包含在response中,沒法給出"是」或"否「的回答。好比FetchRequest中指明的開始讀取的offset是14,而一個compressed Message的offset是13,那麼這個Message中可能包含offset爲14的消息,也可能不包含。
  • 若是compressed Message的offset是它包含的MessageSet的最大offset,那麼,能夠根據這個offset肯定這個Message「不該該」包含在response中。好比FetchRequest中指明的開始讀取的offset是14,那麼若是一個compressed Message的offset是13,那它就不應被包含在response中。而當咱們順序排除這種不符合條件的Message,就能夠找到第一個應該被包含在response中的Message(壓縮或者未壓縮), 從它開始讀取。

在第一種狀況下(最小offset),咱們儘管能夠經過連續的兩個Message肯定第一個Message的offset範圍,可是這樣在讀取時須要在讀取第二個Message的offset以後跳回到第一個Message,  這一般會使得最近一次讀(也就讀第二個offset)的文件系統的緩存失效。並且邏輯比第二種狀況更復雜。在第二種狀況下,broker只須要找到第一個其offset大於或等於目標offset的Message,從它能夠讀取便可,並且也一般能利用到文件系統緩存,由於offset和消息內容有可能在同一個緩存塊中。

在處理FetchRequest時,broker的邏輯也正是如此。對FetchRequest的處理會調用到Log#read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None)方法,而後調用到LogSegment的read方法,它的以後的調用有不少,全部不貼代碼了,它的註釋說明了讀取的邏輯

* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified

即,返回的MessageSet的第一條Message的offset >= startOffset。

而在broker給compressed Message賦予offset時,其邏輯也是賦予其包含的messages中的最大offset。這段邏輯在ByteBufferMessageSet的create方法中:

      messageWriter.write(codec = compressionCodec) { outputStream =>
        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) //建立壓縮流
        try {
          for (message <- messages) {
            offset = offsetCounter.getAndIncrement //offsetCounter是一個AtomicLong,使用它的當前值做爲這條Message的offset,而後+1做爲下一條消息的offset
            output.writeLong(offset)//寫入這條日誌記錄的offset
            output.writeInt(message.size)//寫入這條日誌記錄的大小
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) //寫入這條記錄的Message
          }
        } finally {
          output.close()
        }
      }
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      writeMessage(buffer, messageWriter, offset)//以最後一個Message的offset做爲這個compressed Message的offset

Validate Message

什麼須要驗證?

先看一下消息的哪些特徵須要被驗證。

首先,網絡傳輸過程當中,數據可能會產生錯誤,即便是寫在磁盤上的消息,也可能會因爲磁盤的問題產生錯誤。所以,broker對接收到的消息須要驗證其完整性。這裏的消息就是前邊協議裏定義的Message。對於消息完整性的檢測,是使用CRC32校驗,可是並非對消息的全部部分計算CRC,而是對Message的Crc部分之後的部分,不包括記錄的offset和MessageSize部分。把offset和MessageSize加到CRC計算中,能夠對完整性有更強的估證,可是壞處在於這兩個部分在消息由producer到達broker之後,會被broker重寫,所以若是把它們計算在crc裏邊,就須要在broker端從新計算crc32,這樣會帶來額外的開銷。

CRC32沒有檢測出錯誤的機率在0.0047%如下,加上TCP自己也有校驗機制,不能檢測出錯誤的機率就很小了(這個還須要再仔細算一下)。

除了消息的完整性,還須要對消息的合規性進行檢驗,主要是檢驗offset是不是單調增加的,以及MessageSize是超過了最大值。

這裏檢驗時使用的MessageSize就不是Message自己的大小了,而是一個記錄的大小,包括offset和MessageSize,這個也挺奇怪的,有必要非拉上這倆嗎?

並且在broker端檢驗producer發來的MessageSet時,也不必檢驗它的offset是不是單調增加的呀,畢竟leader還要對Message的offset從新賦值。而follower是從leader處拉取的,若是網絡或者磁盤出錯,經過對offset的單調性檢查也可能會漏掉出錯了的記錄,對於consumer來講也是同理。因此這裏有點奇怪。

什麼時候須要驗證?

在broker把收到的producer request裏的MessageSet append到Log以前,以及consumer和follower獲取消息以後,都須要進行校驗。

這種狀況分紅兩種:

1. broker和consumer把收到的消息append到log以前

2. consumser收到消息後

第一種狀況都是在調用Log#append時進行檢驗的。

如何驗證?

先看下Log#append的方法聲明

def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo

在replica的fetcher線程調用append方法時,會把assignOffsets設成false,而leader處理produce request時,會把assignOffsets設成true。

下面append方法的一部分代碼

    val appendInfo = analyzeAndValidateMessageSet(messages) //驗證消息
    
    // if we have any valid messages, append them to the log
    if(appendInfo.shallowCount == 0)
      return appendInfo
      
    // trim any invalid bytes or partial messages before appending it to the on-disk log
    var validMessages = trimInvalidBytes(messages, appendInfo)//trim掉不可用的部分或者殘缺的消息

    try {
      // they are valid, insert them in the log
      lock synchronized {
        appendInfo.firstOffset = nextOffsetMetadata.messageOffset 

       if(assignOffsets) { //若是須要從新賦予offset
          // assign offsets to the message set
          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
          try {
            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) //驗證消息而且賦予offset
          } catch {
            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
          }
          appendInfo.lastOffset = offset.get - 1
        } else {
          // we are taking the offsets we are given
          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
            throw new IllegalArgumentException("Out of order offsets found in " + messages)
        }

        // re-validate message sizes since after re-compression some may exceed the limit 對壓縮後消息從新驗證MessageSize是否超過了容許的最大值
        for(messageAndOffset <- validMessages.shallowIterator) {
          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
            // we record the original message set size instead of trimmed size
            // to be consistent with pre-compression bytesRejectedRate recording
            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
          }
        }

注意到對MessageSize驗證了兩次,第二次是對從新壓縮後的消息。KAFKA-1718裏提到MessageSizeToLargeException,就是在這時候檢測出來的。

初步檢驗:analyzeAndValidateMessageSet

具體的檢驗消息完整性和offset單調增加的邏輯在analyzeAndValidateMessageSet方法裏。這個方法的實現裏,須要注意幾點:

  1. 它是使用ByteBufferMessageSize的shallowIterator來對這個MessageSet的消息進行迭代,這也意味着並不會對compressed message裏邊的MessageSet解壓後再進行檢驗,而是把comprssed message做爲單個Message進行檢驗。
  2. 它計算checksum時,是計算的MagicByte及其之後的內容。
     def computeChecksum(): Long = 
        CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  buffer.limit - MagicOffset)

     

  3. 它比較的是entrySize與MaxMessageSize的大小,來肯定這個消息是否太大
      def entrySize(message: Message): Int = LogOverhead + message.size
    
    ---------------------------------
    
      val MessageSizeLength = 4
      val OffsetLength = 8
      val LogOverhead = MessageSizeLength + OffsetLength

     

  4. 它返回的LogAppendInfo中會包括一個targetCodec,指明這個MessageSet將要使用的壓縮方式。leader處理produce request時,將使用這個壓縮方式從新壓縮整個MessageSet。
        val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)

    config.compressionType就是broker配置裏的compression.type的值,若是它是「producer", 就會使用producer request使用壓縮方式,不然就使用config.compressionType指明的壓縮方式。注意若是一個MessageSet裏的Message採用了不一樣的壓縮方式,最後被當成sourceCodec的是最後一個壓縮了的消息的壓縮方式。

再次檢驗而且賦予offset :validateMessagesAndAssignOffsets

只有leader處理produce request時,會調用ByteBufferMessageSet的這個方法。 它不會檢測analyzeAndValidateMessageSet已經檢測的內容,可是會把這個MessageSet進行深度遍歷(即若是它裏邊的消息是壓縮後,就把這個消息解壓開再遍歷),這樣它就能作analyzeAndValidateMessageSet不能進行的檢測:對於compacted topic檢測其key是否爲空,若是爲空就拋出InvalidMessageException。

另外,它會把深度遍歷後得到的Message放在一塊兒從新壓縮。

若是MessageSet的尾部不是完整的Message呢?

這是在獲取ByteBufferMessageSet的iternalIterator時候處理的。

      def makeNextOuter: MessageAndOffset = {
        // if there isn't at least an offset and size, we are done
        if (topIter.remaining < 12)
          return allDone()
        val offset = topIter.getLong()
        val size = topIter.getInt()
        if(size < Message.MinHeaderSize)
          throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")

        // we have an incomplete message
        if(topIter.remaining < size)
          return allDone()
    .    ...
    }    

注意返回allDone()和拋出InvalidMessageException的時機。

  • 若是這個MessageSet剩下部分不到12bytes,那剩下的部分就是下一個MessageSet頭部的一部分,是無法處理的,也是沒辦法檢驗的,所以就返回allDone。
  • 若是夠12bytes,就能夠讀出offset和MessageSize。MessageSize至少會大於Message頭裏邊的那些crc、Attributes, MagicBytes等加起來的大小,所以若是MessageSize比這個還小,就確定是個entry有問題,因此就拋出異常。這裏的問題在於,即便MessageSet最後的那個Message是不完整的,只要MessageSize有問題,也會拋異常,而不是忽略這個不完整的Message。(這個多是沒考慮到,也多是有別的考慮,不過不管怎麼處理最後的這個不完整的Message,都有必定的道理)。

 consumer端的驗證

consumer(0.9)會檢查checksum,不過是能夠配置的,緣由正如config裏說的同樣。

    public static final String CHECK_CRCS_CONFIG = "check.crcs";
    private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";

config的文檔說,檢查checksum是爲了"ensures no on-the-wire or on-disk corruption to the message occurred."即,爲了保證沒有在網絡傳輸出或者磁盤存儲時出現了消息的損壞。可是checksum計算時會帶來開銷,因此追求最佳性能,能夠關掉checksum的檢查。


 

下面來看一下幾個與消息格式相關的KIP。爲何須要這些改變呢?爲何以前沒有實現這些改變呢?都是由於各類折衷吧,需求與性能折衷,需求與實現所需的工做量的折衷……

下面的幾個KIP可能會一塊兒加上去,畢竟都是對消息格式的修改,不能搞衝突了。

KIP-31 - Move to relative offsets in compressed message sets

前邊提到了,在leader收到ProduceRequet以後,它會解壓開compressed message(也就是是這個KIP裏的compressed messageset,這兩說說法的確有些亂),而後給裏邊包含的message set的每條消息從新賦予offset。這個作法也是應該的,乍一看也沒什麼很差。可是問題在於,不只是直接改個offset這麼簡單,在改完以後,須要從新壓縮這些消息,還要計算。這麼一搞,開銷就大了。KIP-31就是想把這部分的性能損失降下來。(這個KIP已是accepted狀態)

作法是把在一個compressed message set裏邊的每一個message的offset裏記下當前message相對於外層的wrapper message的偏移。用漢語說這個意思比較費勁,KIP裏這麼說

When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.

When broker receives a compressed message, it only needs to 

    1. Decompress the message to verify the CRC and relative offset.
    2. Set outer message's base offset. The outer message's base offset will be the offset of the last inner message.  (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)

注意,這個wrapper message裏記的base offset, 是它所含的message set裏的最後一個message的offset。這個和當前的compressed message的offset是一致的。

而後當broker收到一個壓縮後的消息時,它只須要

  • 驗證CRC與realtive offset的正確性
  • 從新設定外層消息的offset,也就是base offset。

KIP-32 - Add timestamps to Kafka message

在消息里加時間戳。須要注意的是,這個KIP還在討論中(如下的內容是基於2016年1月7日的版本)。不像上一個已經肯定了。

(俺是以爲這個事情早該作了……)

首先,來看一下動機,這個提有意思

Motivation

This KIP tries to address the following issues in Kafka.

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Log rolling might break for a newly created replica as well because of the same reason as (1).
  3. Some use cases such as streaming processing needs a timestamp in messages.

說的是這幾個緣由

1. Log retention會不靠譜。當前log retention是在log segment層面作的,是按照log segment的最後修改時間肯定是否要刪除一個log segment. 可是,當replica重分配發生時,新被分配的這個replica的log segment的修改時間會被設成當前時間。這麼一來,它就不能被按照log retention想要作的那樣(其實是想把一段時間以前的消息刪除)被刪除。

2. 因爲和1一樣的緣由,對於一個新建立的replica(意思應該是移動位置的replica, 並非增長分區後新加的replica)log rolling有時候也會不靠譜。

3. 有些場景中須要消息含有時間戳,好比流處理。

感受,貌似第三個緣由纔是決定性的,擁抱流處理。

接口的變化

準備在Message里加入timestamp字段

準備增長兩個配置

  • message.timestamp.type 能夠選CreateTime或者LogAppendTime,CreateTime就是這條消息生成的時間,是在producer端指定的。LogAppendTime就是append到log的時間(實現細節沒有說明)。
  • max.message.time.difference.ms 若是選擇了CreateTime, 那麼只有當createTime和broker的本地時間相差在這個配置指定的差距以內,broker纔會接受這條消息。

糾結之處

以前關於這個KIP的討論主要是關於使用哪一個時間, 是使用LogAppendTime(broker time),仍是CreateTime(application time)。

兩種都有利有弊:

The good things about LogAppendTime are: 使用LogAppendTime的好處在於

  1. Broker is more robust. Broker比起用戶程序更健壯(更不容易出錯,好比用戶程序可能有bug,致使CreateTime設置的不正確,想想KIP-33,若是錯得離譜,索引怎麼建?)
  2. Monotonically increasing. LogAppendTime是單調增加的。(可是,follower收到的消息的timestamp該怎麼設呢?若是不用leader帶來的,就不能肯定是否monotonically increasing)
  3. Deterministic behavior for log rolling and retention.log rolling和retention的行爲是肯定性的。(若是按消息裏的這個timestamp來決定這兩個操做的行爲,那麼讓用戶指定timestamp的確挺危險的)
  4. If CreateTime is required, it can always be put into the message payload.若是須要CreateTime,能夠加到消息的內容裏。(這個的確是……)

The good things about CreateTime are: 使用CreateTime的好處是

  1. More intuitive to users. 更符合用戶的思惟(用戶固然是想使用本身填進去的時間)。
  2. User may want to have log retention based on when the message is created instead of when the message enters the pipeline.用戶可能更但願用消息被建立的時間來決定log retention的行爲,而不是消息進行處理管道的時間。
  3. Immutable after entering the pipeline.這樣,消息的timestamp在進入管道後就不會再改變了。

在俺看來,這兩個選擇的確挺糾結的。用戶確定是想用本身產生消息的時間,否則很難準確地找到一條消息。可是,若是使用用戶指定的時間,broker端的行爲就變得複雜了,好比,若是用戶指定的時間不是單調遞增的,該怎麼建時間索引。可是用戶產生畸形的時間,倒能夠經過配置裏max.message.time.difference.ms來控制。或許能夠加另外一個配置,容許broker在必定範圍內修改CreateTime,好比最多能夠更改1000ms。這樣就能即便消息的timestamp單調增加,也能使用戶對消息的時間的估計比較準確。不過,這樣可能就須要讓broker time的含義變成broker收到消息時間,而不是append到log的時間。不然就難以肯定什麼時候該拒絕沒法在指定範圍內修改timestamp的消息。

 

KIP-33 - Add a time based log index

動機:

當前按照時間戳查找offset獲得的結果是很是粗粒度的,只能在log segment的級別。(對於reassigned replica就差得沒譜了。)因此這個KIP提議建一個基於時間的對日誌的索引,來容許按timestamp搜索消息的結果更準確。

這個KIP和KIP-32是緊密相關的。這倆KIP都在討論過程當中。

相關文章
相關標籤/搜索