本文原地址 https://blog.lpflpf.cn/passages/nsqd-study-2/git
NSQ 消息隊列實現消息落地使用的是 FIFO 隊列。
實現爲 diskqueue , 使用包github.com/nsqio/go-diskqueue
,本文主要對diskqueue
的實現作介紹。
本文代碼來自於github.com/nsqio/go-diskqueue
實現的功能是一個FIFO的隊列,實現以下功能:github
diskqueue 是 BackendQueue (一個隊列所須要的接口) 的實現, 接口定義以下:golang
type BackendQueue interface { Put([]byte) error // 將一條消息插入到隊列中 ReadChan() chan []byte // 返回一個無緩衝的chan Close() error // 隊列關閉 Delete() error // 刪除隊列 (實際在實現時,數據仍保留) Depth() int64 // 返回讀延遲的消息量 Empty() error // 清空消息 (實際會刪除全部的記錄文件) }
對於須要原子操做的64bit 的字段,須要放在struct 的最前面,緣由請看學習總結第一條
數據結構中定義了 文件的讀寫位置、一些文件讀寫的控制變量,以及相關操做的channel.緩存
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) readPos int64 // 讀的位置 writePos int64 // 寫的位置 readFileNum int64 // 讀文件的編號 writeFileNum int64 // 寫文件的編號 depth int64 // 讀寫文件的距離 (用於標識隊列的長度) sync.RWMutex // instantiation time metadata name string // 標識隊列名稱,用於落地文件名的前綴 dataPath string // 落地文件的路徑 maxBytesPerFile int64 // 每一個文件最大字節數 minMsgSize int32 // 單條消息的最小大小 maxMsgSize int32 // 單挑消息的最大大小 syncEvery int64 // 每寫多少次刷盤一次 syncTimeout time.Duration // 至少多久會刷盤一次 exitFlag int32 // 退出標識 needSync bool // 若是 needSync 爲true, 則須要fsync刷新metadata 數據 // keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64 // 下一次讀的位置 nextReadFileNum int64 // 下一次讀的文件number readFile *os.File // 讀 fd writeFile *os.File // 寫 fd reader *bufio.Reader // 讀 buffer writeBuf bytes.Buffer // 寫 buffer // exposed via ReadChan() readChan chan []byte // 讀channel // internal channels writeChan chan []byte // 寫 channel writeResponseChan chan error // 同步寫完以後的 response emptyChan chan int // 清空文件的channel emptyResponseChan chan error // 同步清空文件後的channel exitChan chan int // 退出channel exitSyncChan chan int // 退出命令同步等待channel logf AppLogFunc // 日誌句柄 }
初始化一個隊列,須要定義前綴名, 數據路徑,每一個文件的最大字節數,消息最大最小限制,以及刷盤頻次和最長刷盤時間,最後還有一個日誌函數數據結構
func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { d := diskQueue{ name: name, dataPath: dataPath, maxBytesPerFile: maxBytesPerFile, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), writeChan: make(chan []byte), writeResponseChan: make(chan error), emptyChan: make(chan int), emptyResponseChan: make(chan error), exitChan: make(chan int), exitSyncChan: make(chan int), syncEvery: syncEvery, syncTimeout: syncTimeout, logf: logf, } // no need to lock here, nothing else could possibly be touching this instance err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err) } go d.ioLoop() return &d }
能夠看出, 隊列中均使用不帶cache 的chan,消息只能阻塞處理。函數
d.retrieveMetaData()
是從文件中恢復元數據。oop
d.ioLoop()
是隊列的事件處理邏輯,後文詳細解答學習
文件名 "name" + .diskqueue.%06d.dat
其中, name 是 topic, 或者topic + channel 命名.
數據採用二進制方式存儲, 消息大小+ body 的形式存儲。this
ioLoop 函數,作全部時間處理的操做,包括:atom
func (d *diskQueue) ioLoop() { var dataRead []byte var err error var count int64 var r chan []byte // 定時器的設置 syncTicker := time.NewTicker(d.syncTimeout) for { // 若到達刷盤頻次,標記等待刷盤 if count == d.syncEvery { d.needSync = true } if d.needSync { err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) } count = 0 } // 有可讀數據,而且當前讀chan的數據已經被讀走,則讀取下一條數據 if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err) d.handleReadError() continue } } r = d.readChan } else { // 若是無可讀數據,那麼設置 r 爲nil, 防止將dataRead數據重複傳入readChan中 r = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed // 若是讀chan 被寫入成功,則會修改讀的偏移 d.moveForward() case <-d.emptyChan: // 清空全部文件,並返回empty的結果 d.emptyResponseChan <- d.deleteAllFiles() count = 0 case dataWrite := <-d.writeChan: // 寫msg count++ d.writeResponseChan <- d.writeOne(dataWrite) case <-syncTicker.C: // 到刷盤時間,則修改needSync = true if count == 0 { // avoid sync when there's no activity continue } d.needSync = true case <-d.exitChan: goto exit } } exit: d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name) syncTicker.Stop() d.exitSyncChan <- 1 }
須要注意的點:
文件名: "name" + .diskqueue.meta.dat
其中, name 是 topic, 或者topic + channel 命名.
metadata 數據包含5個字段, 內容以下:
depth\nreadFileNum,readPos\nwriteFileNum,writePos
當服務關閉後,metadata 數據將保存在文件中。當服務再次啓動時,將從文件中將相關數據恢復到內存中。
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
緣由 緣由在golang 源碼 sync/atomic/doc.go 中
// On ARM, x86-32, and 32-bit MIPS, // it is the caller's responsibility to arrange for 64-bit // alignment of 64-bit words accessed atomically. The first word in a // variable or in an allocated struct, array, or slice can be relied upon to be // 64-bit aligned.
buffer_pool.go
文件中, 簡單實現了 bytes.Buffer 的對象池,減小了gc 壓力