Kafka消息存儲之FileMessageSet

摘要

看過前面幾篇博客的盆友可能會問,逼逼了這麼多還不知道消息到底存到哪兒了,分明標題黨嘛。這一次咱們就來看與存儲切實相關的底層操做類FileMessageSet。它是MessageSet的一個子類,操做消息和文件之間的讀寫操做。想一想咱們也知道,這特麼就是要寫增刪改查啊。這一次的代碼確實沒啥好說的,可是FileMessageSet確是比較重要的一個類,仍是簡短講一下吧。ide

FileMessageSet的功能

  • 消息的增刪改查scala

  • 進行必要的檢查,好比是不是指定的消息格式(檢查Magic值)設計

  • 進行消息格式的轉換code

對於最核心的功能——增刪改查,咱們在這裏進一步展開。首先FileMessageSet只處理最外層的消息,而不考慮嵌套的消息,嵌套消息會移交給以前的ByteBufferMessageSet處理。某種程度上,咱們也能夠把ByteBufferMessageSet看作是嵌套消息。orm

FileMessageSet的刪除也分爲兩種,一種是從特定位置截斷,一種是直接刪除整個文件。其查詢主要是從消息的序號也就是offset得到其在文件中的位置。其增長只容許向尾部追加,若想在中間添加,必須先截斷。對象

咱們列一下幾個重要的原子操做吧rem

  • read(buffer,position,length),read(position,length):FileMessageSetget

  • writeTo(channel,position,size)博客

  • truncate(size)it

  • search(offset):position

  • close

  • flush

FileMesssage的設計

FileMessageSet使用FileChannel來進行讀寫,咱們的操做依賴於position進行,須要首先定位。一樣,FileMessageSet容許支持切片,也就是截取文件中的一部分,指定start和end。可是這樣每次檢查末尾都須要考慮end了。

這裏首先要注意的第一點是channel的遊標應該始終定位在set的尾部,這是爲了保證寫入是順序的,因此在初始化的時候就應該將遊標移到尾部。

第二點是在關閉channel的時候須要先作flush而後截斷。這一點可能不太好理解,這裏舉個例子,若是我使用了分片,並在位置end後寫入了一條新消息,因爲必須保證消息是有序的,因此後面全部的消息必須丟棄。這也是保證消息的順序寫特性。

def close() {
        flush()
        trim()
        channel.close()
      }

第三點是迭代的過程,這裏面幾乎全部的原子操做均是從遍歷實現的,遍歷中須要進行較多的檢查操做,主要是如下幾點。

  • 若是當前讀取的messageSize小於最小的消息頭大小,說明消息出現錯誤

  • 若是當前讀取的messageSize大於剩餘的容量,說明最後一條消息不完整

  • 若是剩下的容量小於offsetSize+MessageSizeLength,說明已經沒有消息了

可是這裏的容量須要同時考慮指定的end和channel的結尾,下面以生成迭代器爲例。

override def makeNext(): MessageAndOffset = {
            //最後一條消息出如今end以後
            if(location + sizeOffsetLength >= end)
              return allDone()
    
    
            // read the size of the item
            sizeOffsetBuffer.rewind()
            channel.read(sizeOffsetBuffer, location)
    
            //最後一條消息出如今下一文件中
            if(sizeOffsetBuffer.hasRemaining)
              return allDone()
    
            sizeOffsetBuffer.rewind()
            val offset = sizeOffsetBuffer.getLong()
            val size = sizeOffsetBuffer.getInt()
    
            //最後一條消息被end截斷或消息大小出現問題
            if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end)
              return allDone()
          //消息過大
            if(size > maxMessageSize)
              throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
    
            // read the item itself
            val buffer = ByteBuffer.allocate(size)
            channel.read(buffer, location + sizeOffsetLength)
    
            //最後一條消息被文件截斷
            if(buffer.hasRemaining)
              return allDone()
            buffer.rewind()
    
            // increment the location and return the item
            location += size + sizeOffsetLength
            new MessageAndOffset(new Message(buffer), offset)
          }

第四條是追加是以ByteBufferMessageSet爲單位的,這主要是將嵌套消息和通常消息還有批量寫入統一在一個方法下。

第五條是一個有趣的代碼細節

def delete(): Boolean = {
    CoreUtils.swallow(channel.close())
    file.delete()
  }

def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
    try {
      action
    } catch {
      case e: Throwable => log(e.getMessage(), e)
    }
  }

這裏將代碼塊包裹在try catch中,經過這種方法調用的形式,很是簡潔優美,有點相似於使用AOP收集異常,值得借鑑。

消息讀入的過程

寫到這兒,讓咱們來回顧一下整個消息存儲的內容並整理出完整的流程吧。

  1. 首先FileMessageSet讀取最外層消息

  2. 若該消息是嵌套消息,則生成ByteBufferMessageSet解壓縮並生成原子消息集

  3. 經過調用message自身的方法進行檢驗和獲取基本信息好比消息格式

  4. 經過MessageAndMeta加上譯碼器得到key-value對象

消息寫入的過程

  1. 首先MessageWriter寫入key-value和消息頭生成buffer

  2. 對於嵌套消息使用剛剛的buffer生成 ByteBufferMessageSet並convert壓縮成新的ByteBufferMessageSet

  3. 再使用FileMessageSet追加ByteBUfferMessageSet

相關文章
相關標籤/搜索