NSQ 源碼閱讀 (四) diskqueue 文件讀寫

引言

咱們在這個系列第一篇文章中提到過,若是須要消息落地而對存儲子系統的選擇上,從速度上來講 文件系統>分佈式KV(持久化)>分佈式文件系統>數據庫。而NSQ選擇了文件系統做爲存儲子系統。這篇文章將重點介紹nsq 對於文件的操做。數據庫

什麼時候寫入文件?

在內存的msg chan buffer 已滿的時候,會將msg 寫入文件,代碼以下:數組

func (c *Channel) put(m *Message) error {
    select {
    case c.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, c.backend)
        bufferPoolPut(b)
        c.ctx.nsqd.SetHealth(err)
        if err != nil {
            c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
                c.name, err)
            return err
        }
    }
    return nil
}

寫一條message

func (d *diskQueue) writeOne(data []byte) error {

diskQueue維護了寫文件和寫文件的offset分佈式

// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
    ...
    writePos     int64
    ...
    writeFile *os.File
    ...
}

利用Seek 函數將寫文件的偏移量設置爲writePos:函數

if d.writePos > 0 {
            _, err = d.writeFile.Seek(d.writePos, 0)

而後以二進制的方式寫入data的size:code

dataLen := int32(len(data))
d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)

此處的巧妙在於binary.Write會根據寫入數據的類型寫入一段固定大小的數據。此處dataLen 是int32,因此會寫入一段4個byte的數據來表示data的size。讀取的時候先讀一段4個byte的數據就知道了data的size。
以後寫入data:內存

_, err = d.writeBuf.Write(data)
_, err = d.writeFile.Write(d.writeBuf.Bytes())

讀一條message

readOne函數以byte 數組的形式讀一條message 出來it

func (d *diskQueue) readOne() ([]byte, error) {

diskQueue 維護了當前讀取的文件和文件的offsetio

// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
readPos      int64
...
readFile  *os.File

利用Seek 函數將當前文件的偏移量設置爲readPos:file

if d.readPos > 0 {
            _, err = d.readFile.Seek(d.readPos, 0)
            if err != nil {
                d.readFile.Close()
                d.readFile = nil
                return nil, err
            }
        }

先把一個message的大小讀出來:select

err = binary.Read(d.reader, binary.BigEndian, &msgSize)

msgSize 和寫文件時候的dataLen都是int32類型
有了msgSize, 定義一段msgSize大小的buffer,從文件裏讀一段數據來填滿這個buffer,buffer裏面的數據就是一條message

readBuf := make([]byte, msgSize)
    _, err = io.ReadFull(d.reader, readBuf)
相關文章
相關標籤/搜索