消息隊列 NSQ 源碼學習筆記 (二)

NSQ 消息隊列實現消息落地使用的是 FIFO 隊列。
實現爲 diskqueue , 使用包 github.com/nsqio/go-diskqueue ,本文主要對 diskqueue的實現作介紹。git

功能定位

  • 在NSQ 中, diskqueue 是一個實例化的 BackendQueue, 用於保存在內存中放不下的消息。使用場景如Topic 隊列中的消息,Channel 隊列中的消息
  • 實現的功能是一個FIFO的隊列,實現以下功能:
    • 支持消息的插入、清空、刪除、關閉操做
    • 能夠返回隊列的長度(寫和讀偏移的距離)
    • 具備讀寫功能,FIFO 的隊列

diskqueue 的實現

BackendQueue 接口以下:github

type BackendQueue interface {
    Put([]byte) error      // 將一條消息插入到隊列中
    ReadChan() chan []byte // 返回一個無緩衝的chan
    Close() error          // 隊列關閉
    Delete() error         // 刪除隊列 (實際在實現時,數據仍保留)
    Depth() int64          // 返回讀延遲的消息量
    Empty() error          // 清空消息 (實際會刪除全部的記錄文件)
}

數據結構

對於須要原子操做的64bit 的字段,須要放在struct 的最前面,緣由請看學習總結第一條
數據結構中定義了 文件的讀寫位置、一些文件讀寫的控制變量,以及相關操做的channel.golang

// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms

    // run-time state (also persisted to disk)
    readPos      int64               // 讀的位置
    writePos     int64               // 寫的位置
    readFileNum  int64               // 讀文件的編號
    writeFileNum int64               // 寫文件的編號
    depth        int64               // 讀寫文件的距離 (用於標識隊列的長度)

    sync.RWMutex

    // instantiation time metadata
    name            string           // 標識隊列名稱,用於落地文件名的前綴 
    dataPath        string           // 落地文件的路徑
    maxBytesPerFile int64            // 每一個文件最大字節數
    minMsgSize      int32            // 單條消息的最小大小
    maxMsgSize      int32            // 單挑消息的最大大小
    syncEvery       int64            // 每寫多少次刷盤一次
    syncTimeout     time.Duration    // 至少多久會刷盤一次
    exitFlag        int32            // 退出標識
    needSync        bool             // 若是 needSync 爲true, 則須要fsync刷新metadata 數據

    // keeps track of the position where we have read
    // (but not yet sent over readChan)
    nextReadPos     int64            // 下一次讀的位置
    nextReadFileNum int64            // 下一次讀的文件number

    readFile  *os.File               // 讀 fd
    writeFile *os.File               // 寫 fd
    reader    *bufio.Reader          // 讀 buffer
    writeBuf  bytes.Buffer           // 寫 buffer

    // exposed via ReadChan()
    readChan chan []byte             // 讀channel

    // internal channels
    writeChan         chan []byte    // 寫 channel
    writeResponseChan chan error     // 同步寫完以後的 response
    emptyChan         chan int       // 清空文件的channel
    emptyResponseChan chan error     // 同步清空文件後的channel
    exitChan          chan int       // 退出channel
    exitSyncChan      chan int       // 退出命令同步等待channel

    logf AppLogFunc                  // 日誌句柄
}

初始化一個隊列

初始化一個隊列,須要定義前綴名, 數據路徑,每一個文件的最大字節數,消息最大最小限制,以及刷盤頻次和最長刷盤時間,最後還有一個日誌函數緩存

func New(name string, dataPath string, maxBytesPerFile int64,
    minMsgSize int32, maxMsgSize int32,
    syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
    d := diskQueue{
        name:              name,
        dataPath:          dataPath,
        maxBytesPerFile:   maxBytesPerFile,
        minMsgSize:        minMsgSize,
        maxMsgSize:        maxMsgSize,
        readChan:          make(chan []byte),
        writeChan:         make(chan []byte),
        writeResponseChan: make(chan error),
        emptyChan:         make(chan int),
        emptyResponseChan: make(chan error),
        exitChan:          make(chan int),
        exitSyncChan:      make(chan int),
        syncEvery:         syncEvery,
        syncTimeout:       syncTimeout,
        logf:              logf,
    }

    // no need to lock here, nothing else could possibly be touching this instance
    err := d.retrieveMetaData()
    if err != nil && !os.IsNotExist(err) {
        d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
    }

    go d.ioLoop()
    return &d
}

能夠看出, 隊列中均使用不帶cache 的chan,消息只能阻塞處理。數據結構

d.retrieveMetaData() 是從文件中恢復元數據。函數

d.ioLoop() 是隊列的事件處理邏輯,後文詳細解答oop

消息的讀寫

文件格式

文件名 "name" + .diskqueue.%06d.dat 其中, name 是 topic, 或者topic + channel 命名.
數據採用二進制方式存儲, 消息大小+ body 的形式存儲。學習

消息讀操做

  • 若是readFile 文件描述符未初始化, 則須要先打開相應的文件,將偏移seek到相應位置,並初始化reader buffer
  • 初始化後,首先讀取文件的大小, 4個字節,而後經過文件大小獲取相應的body 數據
  • 更改相應的偏移。若是偏移達到文件最大值,則會關閉相應文件,讀的文件編號 + 1

消息寫操做

  • 若是writeFile 文件描述符未初始化,則須要先打開相應的文件,將偏移seek到文件末尾。
  • 驗證消息的大小是否符合要求
  • 將body 的大小和body 寫入 buffer 中,並落地
  • depth +1,
  • 若是文件大小大於每一個文件的最大大小,則關閉當前文件,並將寫文件的編號 + 1

事件循環 ioLoop

ioLoop 函數,作全部時間處理的操做,包括:this

  • 消息讀取
  • 寫操做
  • 清空隊列數據
  • 定時刷新的事件
func (d *diskQueue) ioLoop() {
    var dataRead []byte
    var err error
    var count int64
    var r chan []byte

    // 定時器的設置
    syncTicker := time.NewTicker(d.syncTimeout)

    for {
        // 若到達刷盤頻次,標記等待刷盤
        if count == d.syncEvery {
            d.needSync = true
        }

        if d.needSync {
            err = d.sync()
            if err != nil {
                d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
            }
            count = 0
        }

        // 有可讀數據,而且當前讀chan的數據已經被讀走,則讀取下一條數據
        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
            if d.nextReadPos == d.readPos {
                dataRead, err = d.readOne()
                if err != nil {
                    d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
                        d.name, d.readPos, d.fileName(d.readFileNum), err)
                    d.handleReadError()
                    continue
                }
            }
            r = d.readChan
        } else {
            // 若是無可讀數據,那麼設置 r 爲nil, 防止將dataRead數據重複傳入readChan中
            r = nil
        }

        select {
        // the Go channel spec dictates that nil channel operations (read or write)
        // in a select are skipped, we set r to d.readChan only when there is data to read
        case r <- dataRead:
            count++
            // moveForward sets needSync flag if a file is removed
            // 若是讀chan 被寫入成功,則會修改讀的偏移
            d.moveForward()
        case <-d.emptyChan:
            // 清空全部文件,並返回empty的結果
            d.emptyResponseChan <- d.deleteAllFiles()
            count = 0
        case dataWrite := <-d.writeChan:
            // 寫msg
            count++
            d.writeResponseChan <- d.writeOne(dataWrite)
        case <-syncTicker.C:
            // 到刷盤時間,則修改needSync = true
            if count == 0 {
                // avoid sync when there's no activity
                continue
            }
            d.needSync = true
        case <-d.exitChan:
            goto exit
        }
    }

exit:
    d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
    syncTicker.Stop()
    d.exitSyncChan <- 1
}

須要注意的點:atom

  1. 數據會預先讀出來,當發送到readChan 裏面,纔會經過moveForward 操做更改讀的偏移。
  2. queue 的Put 操做非操做,會等待寫完成後,纔會返回結果
  3. Empty 操做會清空全部數據
  4. 數據會定時或者按照設定的同步頻次調用FSync 刷盤

metadata 元數據

metadata 文件格式

文件名: "name" + .diskqueue.meta.dat 其中, name 是 topic, 或者topic + channel 命名.

metadata 數據包含5個字段, 內容以下:

depth\nreadFileNum,readPos\nwriteFileNum,writePos

metadata 做用

當服務關閉後,metadata 數據將保存在文件中。當服務再次啓動時,將從文件中將相關數據恢復到內存中。

學習總結

內存對齊與原子操做的問題

// 64bit atomic vars need to be first for proper alignment on 32bit platforms
  • 現象 nsq 在定義struct 的時候,不少會出現相似的註釋

  • 緣由 緣由在golang 源碼 sync/atomic/doc.go 中

    // On ARM, x86-32, and 32-bit MIPS,
    // it is the caller's responsibility to arrange for 64-bit
    // alignment of 64-bit words accessed atomically. The first word in a
    // variable or in an allocated struct, array, or slice can be relied upon to be
    // 64-bit aligned.
  • 解釋 在arm, 32 x86系統,和 32位 MIPS 指令集中,調用者須要保證對64位變量作原子操做時是64位內存對齊的(而不是32位對齊)。而將64位的變量放在struct, array, slice 的最前面,能夠保證64位對齊

  • 結論 有64bit 原子操做的變量,會定義在struct 的最前面,可使變量使64位對齊,保證程序在32位系統中正確執行

對象池的使用

  • buffer_pool.go 文件中, 簡單實現了 bytes.Buffer 的對象池,減小了gc 壓力
  • 使用場景,須要高頻次作對象初始化和內存分配的狀況,可以使用sync.Pool 對象池減小gc 壓力

如何將操做系統緩存中的數據主動刷新到硬盤中?

  • fsync 函數 (在write 函數以後,須要使用fsync 才能確保數據落盤)

本文代碼來自於 github.com/nsqio/go-diskqueue

相關文章
相關標籤/搜索