MessageWriter是Kafka進行消息寫的工具類,這一部分代碼卻是和整個系統設計沒有多大關係,可是從局部來看,有許多有意思的細節,因此也開一篇短博客來說一講。數組
首先讓咱們列出在消息寫的過程當中可能出現的變化狀況,也就是這個類的設計需求:緩存
輸入源不一樣,有bytes[] stream 基本的數據類型(Int,Long,Byte,bytes)等,均須要支持。網絡
寫入的數據大小不肯定,因此須要考慮自適應容量機制ide
須要必定的自動保障機制,好比在寫入數據後自動生成CRC並填充到頭部;自動計算大小並填充到頭部函數
咱們將這三個需求再切分一下層次,絕大部分的基本類型寫入均可以歸結爲字節的寫入,各類類型的寫入和自適應容量的功能較爲底層,能夠實現的更加普適,而相對的第三個需求則相對上層,能夠分開來實現。工具
實際上,Kafka也是這麼作的,MessagheWriter繼承了一個父類BufferingOutputStream,該類主要用於從各種輸入源中寫入數據到緩存,而後批量寫入Buffer中。性能
寫入的消息會先暫存在BufferingOutputStream內部,他的容量控制是經過字節數組來構成鏈表完成的,每一個字節數組均是定長的(長度由構造函數傳入),同時爲每一個字節數組配備一個遊標來標示已被寫入多少字節內容,同時採用一個引用代表當前正在寫的數組。下面就給出這種基本結構的定義。this
protected final class Segment(size: Int) { val bytes = new Array[Byte](size) var written = 0 var next: Segment = null def freeSpace: Int = bytes.length - written }
BufferingOutputStream的控制策略很是簡單,那就是當前的Segment寫滿,當即增長新的Segment。值得注意的是,這種寫入是不可逆的,就是當你回退後再寫入也會建立新的segment而非重用原有的segment。scala
那麼更加有效和複雜的控制策略是否值得被引入呢?Kafka採用這種簡單的策略是因爲消息的寫入是一次性的,一個序列的消息使用一個writer來寫入,而並不是複用writer。另外就是Kafka的性能更多地受限於網絡帶寬,因此它應該採起儘可能簡單的讀寫策略提升讀寫的效率而沒必要費盡心機來減小內存的建立和釋放(GC並非它的主要問題)。設計
下面咱們以byte數組的寫入爲例,上代碼:
override def write(b: Array[Byte], off: Int, len: Int) { if (off >= 0 && off <= b.length && len >= 0 && off + len <= b.length) { var remaining = len var offset = off while (remaining > 0) { if (currentSegment.freeSpace <= 0) addSegment() val amount = math.min(currentSegment.freeSpace, remaining) System.arraycopy(b, offset, currentSegment.bytes, currentSegment.written, amount) currentSegment.written += amount offset += amount remaining -= amount } } else { throw new IndexOutOfBoundsException() } }
對於基本類型的寫入,MessageWriter使用了位操做,一個字節一個字節的寫入,確保它們構建在字節寫入的基礎之上,這是一種漂亮的概括,咱們也以32位Int的寫入爲例。
private def writeInt(value: Int): Unit = { write(value >>> 24) write(value >>> 16) write(value >>> 8) write(value) }
咱們仍是先貼代碼再廢話吧
def write(key: Array[Byte] = null, codec: CompressionCodec, timestamp: Long, timestampType: TimestampType, magicValue: Byte)(writePayload: OutputStream => Unit): Unit = { withCrc32Prefix { // write magic value write(magicValue) // write attributes var attributes: Byte = 0 if (codec.codec > 0) attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte if (magicValue > MagicValue_V0) attributes = timestampType.updateAttributes(attributes) write(attributes) // Write timestamp if (magicValue > MagicValue_V0) writeLong(timestamp) // write the key if (key == null) { writeInt(-1) } else { writeInt(key.length) write(key, 0, key.length) } // write the payload with length prefix withLengthPrefix { writePayload(this) } } } private def withLengthPrefix(writeData: => Unit): Unit = { // get a writer for length value val lengthWriter = reserve(ValueSizeLength) // save current size val oldSize = size // write data writeData // write length value writeInt(lengthWriter, size - oldSize) }
從上面這段代碼能夠看出scala的with很是相似於Python的decrator,將代碼塊當作無返回無參的函數在with中進行調用。但我想提的是另外一個問題,那就是如何記錄以前的位置呢。咱們說過寫入的過程是不可逆的,寫入的遊標不能回退,可是咱們必須在寫完數據以後再寫入CRC,那麼咱們就須要相似於buffer的mark和reset機制同樣的東東。
可是咱們不能像buffer那樣直接移動遊標,由於咱們須要順利寫入下一條消息,移動遊標再復位實在代價太大。那咱們能不能提早把這一小段內存截取出來賦給另外一個引用,寫入的時候向新引用寫就好了,獨立於原有的數據寫入過程。MessageWriter就是這樣作的,好吧讓咱們介紹那所謂的一小段內存吧。
protected class ReservedOutput(seg: Segment, offset: Int, length: Int) extends OutputStream { private[this] var cur = seg private[this] var off = offset private[this] var len = length //預留的內存大小 override def write(value: Int) = { if (len <= 0) throw new IndexOutOfBoundsException() if (cur.bytes.length <= off) { cur = cur.next off = 0 } cur.bytes(off) = value.toByte off += 1 len -= 1 } }
可是大家必定看出了上面的問題了,這樣寫入不是將數據覆蓋了嗎,因此咱們在寫數據時須要先預留一部份內存,這時就須要跳過一部份內存空間了。
private def skip(len: Int): Unit = { if (len >= 0) { var remaining = len while (remaining > 0) { if (currentSegment.freeSpace <= 0) addSegment() val amount = math.min(currentSegment.freeSpace, remaining) currentSegment.written += amount remaining -= amount } } else { throw new IndexOutOfBoundsException() } }
預留操做在此處
def reserve(len: Int): ReservedOutput = { val out = new ReservedOutput(currentSegment, currentSegment.written, len) skip(len) out }
咱們再來看withLengthPrefix,先預留ValueSize大小的內存,而後寫入數據,最後計算整個的內存變化也就是寫入的數據的大小,寫入預留內存中,是否是很完美?