Kafka消息存儲之ByteBufferMessageSet

摘要

MessageSet是Kafka在底層操做message很是重要的一個層級概念,從名稱上能夠看出來它是消息的集合體,可是代碼中的處理邏輯更多的是在考慮到嵌套消息的處理問題。MessageSet的主要功能是提供Message的順序讀和批量寫,操做的基本對象是MessageAndOffset。前端

MessageSet

首先咱們要講一講message在MessageSet中是如何分佈的,每一條消息的前端都會被加入一個Long來表示消息在set中的位置偏移,緊接着是一個Int來指明消息的大小。MessageSet正是經過讀取這些來分割消息的。java

同時MessageSet實現了Iterable接口,它最主要的兩個方法便是返回迭代器和寫入Channel,下面貼代碼。app

/** Write the messages in this set to the given channel starting at the given offset byte. 
    * Less than the complete amount may be written, but no more than maxSize can be. The number
    * of bytes written is returned */
  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
  
  /**
   * Provides an iterator over the message/offset pairs in this set
   */
  def iterator: Iterator[MessageAndOffset]
  
  /**
   * Gives the total size of this message set in bytes
   */
  def sizeInBytes: Int

在這裏我主要想提幾個問題:ide

  • 爲何是實現iterable而不是直接實現iterator接口?函數

  • MessageSet理應能夠將更多的API集成到該虛類中,好比在特定Offset插入消息和消息集,刪除特定Offset的消息,該不應引入這些API呢?若是須要,應該引入到哪些程度?this

首先看看第一個問題,咱們不難Collection接口也是繼承Iterable的,由於Iterator接口的核心方法next()或者hasNext() 是依賴於迭代器的當前迭代位置的。 若是Collection直接實現Iterator接口,勢必致使集合對象中包含當前迭代位置的數據(指針)。 當集合在不一樣方法間被傳遞時,因爲當前迭代位置不可預置,那麼next()方法的結果會變成不可預知。設計

對於第二個問題,目前我所想到的是因爲有嵌套消息的存在,不管是插入或者刪除都須要通過複雜的檢查操做,而對於消息隊列來講,消息的插入和消費一定是順序的,且發生在頭和尾,將這樣的API設定加入到父虛類未免代價過高。不過這只是我我的的一點想法,不必定正確。指針

iterator的生成

接下來咱們看看ByteBufferMessageSet是如何實現順序讀的,咱們在這裏直接考慮最複雜的情形,即須要解析嵌套消息解壓縮的情形。code

1. 從buffer的position頭部讀取Long,獲得整個set的初始偏移
2.讀取一個Int,獲得第一個message的大小;進行檢查,若該大小小於規定的消息頭大小則出現錯誤,若buffer剩餘的大小小於該大小,則說明最後一條消息被截斷,解析完畢
3. 根據獲得的消息大小生成MessageAndOffset,若它是非壓縮的則直接做爲下一條消息;不然進入4.
4. 生成一個內迭代器負責嵌套消息中的消息迭代。這裏咱們須要注意幾點,一是要根據外層消息的時間戳和時間戳類型來修改內層消息的這兩個屬性;二是要注意偏移的轉換。偏移的轉換我會放在下面重點講一講。
5. 內迭代器首先從壓縮字節碼流裏解壓縮讀取全部的內層消息,並在外層請求next()時一一迭代,直到結束重置內迭代器,並返回外層迭代邏輯

偏移定位問題是咱們在這裏須要重點提到的問題,首先這個偏移到底指什麼:從設計上來說,有兩種選擇:1.相似於消息的序號,決定了消息是第幾條。2.消息在字節碼流中的起始位置。爲了更好的服務於上層讀取以後的處理邏輯,Kafka選擇了消息序號。可是這裏的主要問題是對於嵌套消息,它解壓縮以後,這些內部消息存儲的是相對位移(相對於外層的序號),須要修改它們的相對位移到絕對位移。另外嵌套消息的位移又該是什麼呢,如果僅僅考慮外層,那麼它解壓縮以後全部後面的消息位移都須要增長,因此不合理,一種可取的方式是選擇其內部消息的最後一條的絕對位移。對象

/**
RO = IO_of_a_message - IO_of_the_last_message
AO = AO_Of_Last_Inner_Message + RO


**/

override def makeNext(): MessageAndOffset = {
        messageAndOffsets.pollFirst() match {
          case null => allDone()
          case nextMessage@ MessageAndOffset(message, offset) =>
            if (wrapperMessage.magic > MagicValue_V0) {
              val relativeOffset = offset - lastInnerOffset
              val absoluteOffset = wrapperMessageOffset + relativeOffset
               /** 這裏作的很是巧妙,這個lastInnerOffset其實就是整個外層消息等價的內部相對位移**/
              new MessageAndOffset(message, absoluteOffset)
            } else {
              nextMessage
            }
        }
      }

構造函數和寫函數

MessageSet的寫並不困難,首先寫入偏移,再寫入大小,最後寫入本體。對於壓縮消息,則再給它加個頭部。可是構造函數的設計其實體現了這個類的功能,ByteBufferMessageSet到底會用在什麼地方呢?

  • 直接從Buffer中獲取原始數據,把它當作messageSet進行解析,也就是讀操做

  • 從一系列消息中組裝(可選總體的時間戳和magic值),最終寫入channel中

ByteBufferMessageSet最特別的用處是用來讀寫嵌套消息的,它爲嵌套消息設定相對偏移,檢查全部內部消息的magic值,在讀取的時候轉換內部消息的時間戳,對內部消息集作壓縮並加上嵌套消息頭。

看到這裏,構造函數的設計呼之欲出了。

  • 直接從Buffer或bytes中構造,主要完成讀操做

  • 從一系列消息中構造,能夠指定它們是否組成嵌套消息,也就是指定整個set的壓縮方式,是否壓縮;更重要的是指定偏移

  • 如果須要壓縮,就指定外部嵌套消息的消息頭相關屬性,這一部分就能夠參考Message的構造函數了

下面貼一下後者的構造函數代碼

private def create(offsetAssigner: OffsetAssigner,
                     compressionCodec: CompressionCodec,
                     wrapperMessageTimestamp: Option[Long],
                     timestampType: TimestampType,
                     messages: Message*): ByteBuffer = {
    if (messages.isEmpty)
      MessageSet.Empty.buffer
    else if (compressionCodec == NoCompressionCodec) {
// 不爲嵌套消息時,只是集合一系列消息,僅僅經過offsetAssigner指定預計偏移
      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
      for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())
      buffer.rewind()
      buffer
    } else {
      val magicAndTimestamp = wrapperMessageTimestamp match {
        case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
        case None => MessageSet.magicAndLargestTimestamp(messages)
      }
      var offset = -1L
      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))

// 寫嵌套消息頭
      messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
        val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
        try {
          for (message <- messages) {
            offset = offsetAssigner.nextAbsoluteOffset()
            if (message.magic != magicAndTimestamp.magic)
              throw new IllegalArgumentException("Messages in the message set must have same magic value")
            // Use inner offset if magic value is greater than 0
            if (magicAndTimestamp.magic > Message.MagicValue_V0)

            //注意這裏,內部消息寫入的是相對位移
              output.writeLong(offsetAssigner.toInnerOffset(offset))
            else
              output.writeLong(offset)
            output.writeInt(message.size)
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
          }
        } finally {
          output.close()
        }
      }
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)

//注意這裏,寫嵌套消息大小和位移,嵌套消息位移是它最後一條內部消息的絕對位移
      writeMessage(buffer, messageWriter, offset)
      buffer.rewind()
      buffer
    }
  }

能夠說上面這段代碼幾乎就包括了整個ByteBufferMessageSet的設計目的,和讀寫方式,這一段大有深意啊。知道看懂這一段個人諸多疑惑才獲得解答。

驗證與校訂

這裏主要完成下面幾個任務:

  • 檢查時間戳與時間戳類型

  • 對於嵌套內層消息,須要檢查它是否有key

  • 能夠從新設定時間戳類型和時間戳並修改

  • 須要作偏移校訂,能夠爲整個Set設定總體的一個起始偏移,從新檢查全部消息的位移是否合理

Kafka 0.10.0的代碼中將這些功能混雜在一次遍歷中,對於含壓縮的MessageSet許多操做好比offset校訂等不可行只能返回一個新的解壓縮以後的MessageSet。我我的認爲這可能並非好的作法,應該巴這些功能區分獨立出來,必要的校檢作一塊,校訂作一塊,最後從新設定作一塊。下面以kafka 0.8.0的代碼爲例展現一下如何作偏移校訂。

/**
   * Update the offsets for this message set. This method attempts to do an in-place conversion
   * if there is no compression, but otherwise recopies the messages
   */
private[kafka] def assignOffsets(offsetCounter: AtomicLong, codec: CompressionCodec):            ByteBufferMessageSet = {
    if(codec == NoCompressionCodec) {
      // do an in-place conversion
      var position = 0
      buffer.mark()
      while(position < sizeInBytes - MessageSet.LogOverhead) {
        buffer.position(position)
        buffer.putLong(offsetCounter.getAndIncrement())
        position += MessageSet.LogOverhead + buffer.getInt()
      }
      buffer.reset()
      this
    } else {
      // messages are compressed, crack open the messageset and recompress with correct offset
      val messages = this.internalIterator(isShallow = false).map(_.message)
      new ByteBufferMessageSet(compressionCodec = codec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
    }
  }
相關文章
相關標籤/搜索