看過前面幾篇博客的盆友可能會問,逼逼了這麼多還不知道消息到底存到哪兒了,分明標題黨嘛。這一次咱們就來看與存儲切實相關的底層操做類FileMessageSet。它是MessageSet的一個子類,操做消息和文件之間的讀寫操做。想一想咱們也知道,這特麼就是要寫增刪改查啊。這一次的代碼確實沒啥好說的,可是FileMessageSet確是比較重要的一個類,仍是簡短講一下吧。ide
消息的增刪改查scala
進行必要的檢查,好比是不是指定的消息格式(檢查Magic值)設計
進行消息格式的轉換code
對於最核心的功能——增刪改查,咱們在這裏進一步展開。首先FileMessageSet只處理最外層的消息,而不考慮嵌套的消息,嵌套消息會移交給以前的ByteBufferMessageSet處理。某種程度上,咱們也能夠把ByteBufferMessageSet看作是嵌套消息。orm
FileMessageSet的刪除也分爲兩種,一種是從特定位置截斷,一種是直接刪除整個文件。其查詢主要是從消息的序號也就是offset得到其在文件中的位置。其增長只容許向尾部追加,若想在中間添加,必須先截斷。對象
咱們列一下幾個重要的原子操做吧rem
read(buffer,position,length),read(position,length):FileMessageSetget
writeTo(channel,position,size)博客
truncate(size)it
search(offset):position
close
flush
FileMessageSet使用FileChannel來進行讀寫,咱們的操做依賴於position進行,須要首先定位。一樣,FileMessageSet容許支持切片,也就是截取文件中的一部分,指定start和end。可是這樣每次檢查末尾都須要考慮end了。
這裏首先要注意的第一點是channel的遊標應該始終定位在set的尾部,這是爲了保證寫入是順序的,因此在初始化的時候就應該將遊標移到尾部。
第二點是在關閉channel的時候須要先作flush而後截斷。這一點可能不太好理解,這裏舉個例子,若是我使用了分片,並在位置end後寫入了一條新消息,因爲必須保證消息是有序的,因此後面全部的消息必須丟棄。這也是保證消息的順序寫特性。
def close() { flush() trim() channel.close() }
第三點是迭代的過程,這裏面幾乎全部的原子操做均是從遍歷實現的,遍歷中須要進行較多的檢查操做,主要是如下幾點。
若是當前讀取的messageSize小於最小的消息頭大小,說明消息出現錯誤
若是當前讀取的messageSize大於剩餘的容量,說明最後一條消息不完整
若是剩下的容量小於offsetSize+MessageSizeLength,說明已經沒有消息了
可是這裏的容量須要同時考慮指定的end和channel的結尾,下面以生成迭代器爲例。
override def makeNext(): MessageAndOffset = { //最後一條消息出如今end以後 if(location + sizeOffsetLength >= end) return allDone() // read the size of the item sizeOffsetBuffer.rewind() channel.read(sizeOffsetBuffer, location) //最後一條消息出如今下一文件中 if(sizeOffsetBuffer.hasRemaining) return allDone() sizeOffsetBuffer.rewind() val offset = sizeOffsetBuffer.getLong() val size = sizeOffsetBuffer.getInt() //最後一條消息被end截斷或消息大小出現問題 if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end) return allDone() //消息過大 if(size > maxMessageSize) throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize)) // read the item itself val buffer = ByteBuffer.allocate(size) channel.read(buffer, location + sizeOffsetLength) //最後一條消息被文件截斷 if(buffer.hasRemaining) return allDone() buffer.rewind() // increment the location and return the item location += size + sizeOffsetLength new MessageAndOffset(new Message(buffer), offset) }
第四條是追加是以ByteBufferMessageSet爲單位的,這主要是將嵌套消息和通常消息還有批量寫入統一在一個方法下。
第五條是一個有趣的代碼細節
def delete(): Boolean = { CoreUtils.swallow(channel.close()) file.delete() } def swallow(log: (Object, Throwable) => Unit, action: => Unit) { try { action } catch { case e: Throwable => log(e.getMessage(), e) } }
這裏將代碼塊包裹在try catch中,經過這種方法調用的形式,很是簡潔優美,有點相似於使用AOP收集異常,值得借鑑。
寫到這兒,讓咱們來回顧一下整個消息存儲的內容並整理出完整的流程吧。
首先FileMessageSet讀取最外層消息
若該消息是嵌套消息,則生成ByteBufferMessageSet解壓縮並生成原子消息集
經過調用message自身的方法進行檢驗和獲取基本信息好比消息格式
經過MessageAndMeta加上譯碼器得到key-value對象
首先MessageWriter寫入key-value和消息頭生成buffer
對於嵌套消息使用剛剛的buffer生成 ByteBufferMessageSet並convert壓縮成新的ByteBufferMessageSet
再使用FileMessageSet追加ByteBUfferMessageSet