咱們在這個系列第一篇文章中提到過,若是須要消息落地而對存儲子系統的選擇上,從速度上來講 文件系統>分佈式KV(持久化)>分佈式文件系統>數據庫
。而NSQ選擇了文件系統做爲存儲子系統。這篇文章將重點介紹nsq 對於文件的操做。數據庫
在內存的msg chan buffer 已滿的時候,會將msg 寫入文件,代碼以下:數組
func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, c.backend) bufferPoolPut(b) c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", c.name, err) return err } } return nil }
func (d *diskQueue) writeOne(data []byte) error {
diskQueue
維護了寫文件和寫文件的offset分佈式
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { ... writePos int64 ... writeFile *os.File ... }
利用Seek 函數將寫文件的偏移量設置爲writePos
:函數
if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0)
而後以二進制的方式寫入data的size:code
dataLen := int32(len(data)) d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
此處的巧妙在於binary.Write
會根據寫入數據的類型寫入一段固定大小的數據。此處dataLen 是int32,因此會寫入一段4個byte的數據來表示data的size。讀取的時候先讀一段4個byte的數據就知道了data的size。
以後寫入data:內存
_, err = d.writeBuf.Write(data) _, err = d.writeFile.Write(d.writeBuf.Bytes())
readOne
函數以byte 數組的形式讀一條message 出來it
func (d *diskQueue) readOne() ([]byte, error) {
diskQueue
維護了當前讀取的文件和文件的offsetio
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { readPos int64 ... readFile *os.File
利用Seek 函數將當前文件的偏移量設置爲readPos:file
if d.readPos > 0 { _, err = d.readFile.Seek(d.readPos, 0) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } }
先把一個message的大小讀出來:select
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
msgSize
和寫文件時候的dataLen
都是int32
類型
有了msgSize
, 定義一段msgSize
大小的buffer,從文件裏讀一段數據來填滿這個buffer,buffer裏面的數據就是一條message
readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf)