上一篇主要說了一下nsq是如何保證消息被消費端成功消費,大概提了一下消息的持久化,--mem-queue-size
設置爲 0,全部的消息將會存儲到磁盤。
總有人說nsq
的持久化問題,消除疑慮的方法就是閱讀原碼作benchmark測試,我的感受nsq
仍是很靠譜的。
nsq
本身實現了一個先進先出的消息文件隊列go-diskqueue是把消息保存到本地文件內,很值得分析一下他的實現過程。html
go-diskqueue
會啓動一個gorouting
進行讀寫數據也就是方法ioLoop
會根據你設置的參數來進行數據的讀寫,流程圖以下git
這個圖畫的也不是特別的準確
ioLoop
用的是select
並非if else
當有多個條件爲true
時,會隨機選一個進行執行github
nsq
生成的數據大體以下:
緩存
xxxx.diskqueue.meta.dat
元數據保存了未讀消息的長度,讀取和存入數據的編號和讀取位置
xxxx.diskqueue.編號.dat
消息保存的文件,每個消息的存儲:4Byte消息的長度+消息
oop
一些主要的參數和約束說明
這些參數的使用在後面的處理邏輯中會提到測試
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { // run-time state (also persisted to disk) // 讀取數據的位置 readPos int64 // 寫入數據的位置 writePos int64 // 讀取文件的編號 readFileNum int64 // 寫入文件的編號 writeFileNum int64 // 未處理的消息總數 depth int64 // instantiation time metadata // 每一個文件的大小限制 maxBytesPerFile int64 // currently this cannot change once created // 每條消息的最小大小限制 minMsgSize int32 // 每條消息的最大大小限制 maxMsgSize int32 // 緩存消息有多少條後進行寫入 syncEvery int64 // number of writes per fsync // 自動寫入消息文件的時間間隔 syncTimeout time.Duration // duration of time per fsync exitFlag int32 needSync bool // keeps track of the position where we have read // (but not yet sent over readChan) // 下一條消息的位置 nextReadPos int64 // 下一條消息的文件編號 nextReadFileNum int64 // 讀取的文件 readFile *os.File // 寫入的文件 writeFile *os.File // 讀取的buffer reader *bufio.Reader // 寫入的buffer writeBuf bytes.Buffer // exposed via ReadChan() // 讀取數據的channel readChan chan []byte //..... }
讀寫數據信息的元數據保存在xxxxx.diskqueue.meta.data文件內主要用到代碼裏的字段以下
未處理的消息總數 depth
讀取文件的編號 readFileNum
讀取數據的位置 readPos
寫入文件的編號 writeFileNum
寫入數據的位置 writePos
真實數據以下this
15 0,22 3,24
保存元數據信息atom
func (d *diskQueue) persistMetaData() error { // ... fileName := d.metaDataFileName() tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) // write to tmp file f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600) // 元數據信息 _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", atomic.LoadInt64(&d.depth), d.readFileNum, d.readPos, d.writeFileNum, d.writePos) // 保存 f.Sync() f.Close() // atomically rename return os.Rename(tmpFileName, fileName) }
獲得元數據信息code
func (d *diskQueue) retrieveMetaData() error { // ... fileName := d.metaDataFileName() f, err = os.OpenFile(fileName, os.O_RDONLY, 0600) // 讀取數據並賦值 var depth int64 _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &depth, &d.readFileNum, &d.readPos, &d.writeFileNum, &d.writePos) //... atomic.StoreInt64(&d.depth, depth) d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos return nil }
ioLoop
中發現有數據寫入時,會調用writeOne
方法,把消息保存到文件內htm
select { // ... case dataWrite := <-d.writeChan: count++ d.writeResponseChan <- d.writeOne(dataWrite) // ...
func (d *diskQueue) writeOne(data []byte) error { var err error if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) // ... if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) // ... } } dataLen := int32(len(data)) // 判斷消息的長度是否合法 if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize) } d.writeBuf.Reset() // 寫入4字節的消息長度,以大端序保存 err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { return err } // 寫入消息 _, err = d.writeBuf.Write(data) if err != nil { return err } // 寫入到文件 _, err = d.writeFile.Write(d.writeBuf.Bytes()) // ... // 計算寫入位置,消息數量加1 totalBytes := int64(4 + dataLen) d.writePos += totalBytes atomic.AddInt64(&d.depth, 1) // 若是寫入位置大於 單個文件的最大限制, 則持久化文件到硬盤 if d.writePos > d.maxBytesPerFile { d.writeFileNum++ d.writePos = 0 // sync every time we start writing to a new file err = d.sync() // ... } return err }
寫入完消息後,會判斷當前的文件大小是否已經已於maxBytesPerFile
若是大,就持久化文件到硬盤,而後從新打開一個新編號文件,進行寫入。
調用sync()
方法會持久化文件到硬盤,而後從新打開一個新編號文件,進行寫入。
有幾個地方調用會調用這個方法:
syncEvery
的值時,也就是初始化時設置的最大的條數。會調用sync()
syncTimeout
初始化時設置的同步時間間隔,若是這個時間間隔到了,而且寫入的文件條數>0的時候,會調用sync()
writeOne
方法,寫入完消息後,會判斷當前的文件大小是否已經已於maxBytesPerFile
若是大,會調用sync()
needSync
設置爲true
,ioLoop
會調用sync()
Close
的時候,會調用sync()
func (d *diskQueue) sync() error { if d.writeFile != nil { // 把數據 flash到硬盤,關閉文件並設置爲 nil err := d.writeFile.Sync() if err != nil { d.writeFile.Close() d.writeFile = nil return err } } // 保存元數據信息 err := d.persistMetaData() // ... d.needSync = false return nil }
元數據保存着 讀取文件的編號 readFileNum
和讀取數據的位置 readPos
而且diskQueue
暴露出了一個方法來,經過channel
來讀取數據
func (d *diskQueue) ReadChan() chan []byte { return d.readChan }
ioLoop
裏,當發現讀取位置小於寫入位置 或者讀文件編號小於寫文件編號,而且下一個讀取位置等於當前位置時纔會讀取一條數據,而後放在一個外部全局變量 dataRead
裏,並把 讀取的channel
賦值監聽 r = d.readChan
,當外部有人讀取了消息,則進行moveForward
操做
func (d *diskQueue) ioLoop() { var dataRead []byte var err error var count int64 var r chan []byte for { // ... if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { d.handleReadError() continue } } r = d.readChan } else { r = nil } select { // ... case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed d.moveForward() // ... } } // ... }
readOne
從文件裏讀取一條消息,4個bit的大小,而後讀取具體的消息。若是讀取位置大於最大文件限制,則close。在moveForward裏會進行刪除操做
func (d *diskQueue) readOne() ([]byte, error) { var err error var msgSize int32 // 若是readFile是nil,打開一個新的 if d.readFile == nil { curFileName := d.fileName(d.readFileNum) d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600) // ... d.reader = bufio.NewReader(d.readFile) } err = binary.Read(d.reader, binary.BigEndian, &msgSize) // ... readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf) totalBytes := int64(4 + msgSize) // ... d.nextReadPos = d.readPos + totalBytes d.nextReadFileNum = d.readFileNum // 若是讀取位置大於最大文件限制,則close。在moveForward裏會進行刪除操做 if d.nextReadPos > d.maxBytesPerFile { if d.readFile != nil { d.readFile.Close() d.readFile = nil } d.nextReadFileNum++ d.nextReadPos = 0 } return readBuf, nil }
moveForward
方法會查看讀取的編號,若是發現下一個編號 和當前的編號不一樣時,則刪除舊的文件。
func (d *diskQueue) moveForward() { oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos depth := atomic.AddInt64(&d.depth, -1) // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { // sync every time we start reading from a new file d.needSync = true fn := d.fileName(oldReadFileNum) err := os.Remove(fn) // ... } d.checkTailCorruption(depth)