今天看到研究Jafka的人還挺多的,比較優秀的是@FrankHui的Kafka系列文章,還有@rockybean 的博客。這兩個博客都寫的很詳細,條理清晰,圖文並茂,比起我這種蜻蜓點水,筆記式的記錄要好得多了。java
不過其實讀源碼每一個人側重點都不一樣,我仍是繼續記錄個人。算法
做爲一個實用主義者,我以爲讀源碼有幾種目的:mongodb
由於項目中也沒有用到Jafka,而是公司內部基於mongodb和netty寫的一個MQ,其實我卻是更傾向於3和2,而後再帶着想法回頭改進本身的。既然已經寫了是粗略解讀,卻是不怕人指責了。緩存
Producer的入口能夠看ProducerTest
類。網絡
根據配置,send()可使用sync和async方式。架構
BlockingChannel
是封裝了網絡鏈接的類,底層是NIO的SocketChannel
。async
這裏很有意思的是BlockingChannel
的send
方法:學習
<!-- lang: java --> public int send(BoundedByteBufferSend bufferSend) throws IOException { if (!isConnected()) { throw new ClosedChannelException(); } return bufferSend.writeCompletely(writeChannel); }
通常在涉及IO的開發中,咱們都是直接拿一個流,而後用統一的序列化方式,最後寫入buffer:ui
<!-- lang: java --> writeChannel.write(encoder.encode(object))
而Jafka裏的BoundedByteBufferSend
很顯然是Java裏面動做名詞化的實踐之一,bufferSend.writeCompletely(writeChannel)
的含義是:由BoundedByteBufferSend
來決定如何組織數據並寫入緩存,而不是在負責網絡IO的BlockingChannel
類裏統一作處理。這樣的方式引入了OO的特性,更爲優雅和易維護。一樣,Request
也使用了這樣的方法writeTo
。.net
MessageSet
是打包消息和傳輸的類。Jafka壓縮消息的算法目前只實現了GZip,GZip在JDK裏能夠經過GZIPInputStream
實現。
我的對於網絡協議這一塊比較感興趣,既然看到了Message,就順帶對Jafka的傳輸協議進行一下分析。Jafka所用的協議應該是徹底兼容Kafka的。
在Jafka裏,全部的請求都會首先帶上4個字節的長度,而後纔是內容(代碼參考BoundedByteBufferSend
裏的sizeBuffer
和buffer
):
對於Producer,Producer的message格式以下(代碼參考MessageSet.createByteBuffer()
):
在不壓縮的狀況下,消息仍然是按照4byte長度+內容的方式發送。而壓縮是將全部消息混合壓縮的。