剖析nsq消息隊列(三) 消息傳輸的可靠性和持久化[二]diskqueue

上一篇主要說了一下nsq是如何保證消息被消費端成功消費,大概提了一下消息的持久化,--mem-queue-size 設置爲 0,全部的消息將會存儲到磁盤。
總有人說nsq的持久化問題,消除疑慮的方法就是閱讀原碼作benchmark測試,我的感受nsq仍是很靠譜的。 nsq本身實現了一個先進先出的消息文件隊列go-diskqueue是把消息保存到本地文件內,很值得分析一下他的實現過程。html

總體處理邏輯

go-diskqueue 會啓動一個gorouting進行讀寫數據也就是方法ioLoop
會根據你設置的參數來進行數據的讀寫,流程圖以下 evernotecid://D2602A6B-6F53-4199-885D-97DFC21CBA3E/appyinxiangcom/2479854/ENResource/p1391 git

這個圖畫的也不是特別的準確 ioLoop 用的是 select 並非if else 當有多個條件爲true時,會隨機選一個進行執行github

nsq 生成的數據大體以下: evernotecid://D2602A6B-6F53-4199-885D-97DFC21CBA3E/appyinxiangcom/2479854/ENResource/p1383 緩存

xxxx.diskqueue.meta.dat 元數據保存了未讀消息的長度,讀取和存入數據的編號和讀取位置
xxxx.diskqueue.編號.dat 消息保存的文件,每個消息的存儲:4Byte消息的長度+消息
evernotecid://D2602A6B-6F53-4199-885D-97DFC21CBA3E/appyinxiangcom/2479854/ENResource/p1381

參數說明

一些主要的參數和約束說明 這些參數的使用在後面的處理邏輯中會提到bash

// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
	// run-time state (also persisted to disk)
	// 讀取數據的位置    
	readPos      int64
	// 寫入數據的位置
	writePos     int64
	// 讀取文件的編號    
	readFileNum  int64
	// 寫入文件的編號
	writeFileNum int64
	// 未處理的消息總數    
	depth        int64

	// instantiation time metadata
	// 每一個文件的大小限制    
	maxBytesPerFile int64 // currently this cannot change once created
	// 每條消息的最小大小限制    
	minMsgSize      int32
	// 每條消息的最大大小限制    
	maxMsgSize      int32
	// 緩存消息有多少條後進行寫入    
	syncEvery       int64         // number of writes per fsync
	// 自動寫入消息文件的時間間隔    
	syncTimeout     time.Duration // duration of time per fsync
	exitFlag        int32
	needSync        bool

	// keeps track of the position where we have read
	// (but not yet sent over readChan)
	// 下一條消息的位置    
	nextReadPos     int64
	// 下一條消息的文件編號    
	nextReadFileNum int64

	// 讀取的文件
	readFile  *os.File
	// 寫入的文件    
	writeFile *os.File
	// 讀取的buffer    
	reader    *bufio.Reader
	// 寫入的buffer    
	writeBuf  bytes.Buffer

	// exposed via ReadChan()
	// 讀取數據的channel    
	readChan chan []byte

	//.....
}
複製代碼

數據

元數據

讀寫數據信息的元數據保存在xxxxx.diskqueue.meta.data文件內主要用到代碼裏的字段以下 未處理的消息總數 depth 讀取文件的編號 readFileNum 讀取數據的位置 readPos
寫入文件的編號 writeFileNum 寫入數據的位置 writePos
真實數據以下app

15
0,22
3,24
複製代碼

保存元數據信息oop

func (d *diskQueue) persistMetaData() error {
	// ...
	fileName := d.metaDataFileName()
	tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
	// write to tmp file
	f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
	// 元數據信息
	_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
		atomic.LoadInt64(&d.depth),
		d.readFileNum, d.readPos,
		d.writeFileNum, d.writePos)
	// 保存
	f.Sync()
	f.Close()
	// atomically rename
	return os.Rename(tmpFileName, fileName)
}
複製代碼

獲得元數據信息測試

func (d *diskQueue) retrieveMetaData() error {
	// ...
	fileName := d.metaDataFileName()
	f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
	// 讀取數據並賦值
	var depth int64
	_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
		&depth,
		&d.readFileNum, &d.readPos,
		&d.writeFileNum, &d.writePos)
	//...
	atomic.StoreInt64(&d.depth, depth)
	d.nextReadFileNum = d.readFileNum
	d.nextReadPos = d.readPos
	return nil
}
複製代碼

消息數據

寫入一條數據

ioLoop 中發現有數據寫入時,會調用writeOne方法,把消息保存到文件內ui

select {
		// ...
		case dataWrite := <-d.writeChan:
			count++
			d.writeResponseChan <- d.writeOne(dataWrite)
		// ...
複製代碼
func (d *diskQueue) writeOne(data []byte) error {
	var err error

	if d.writeFile == nil {
		curFileName := d.fileName(d.writeFileNum)
		d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
		// ...
		if d.writePos > 0 {
			_, err = d.writeFile.Seek(d.writePos, 0)
			// ...
		}
	}

	dataLen := int32(len(data))
	// 判斷消息的長度是否合法
	if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
		return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
	}
	d.writeBuf.Reset()
	// 寫入4字節的消息長度,以大端序保存
	err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
	if err != nil {
		return err
	}
	// 寫入消息
	_, err = d.writeBuf.Write(data)
	if err != nil {
		return err
	}

	// 寫入到文件
	_, err = d.writeFile.Write(d.writeBuf.Bytes())
	// ...
	// 計算寫入位置,消息數量加1
	totalBytes := int64(4 + dataLen)
	d.writePos += totalBytes
	atomic.AddInt64(&d.depth, 1)
	// 若是寫入位置大於 單個文件的最大限制, 則持久化文件到硬盤
	if d.writePos > d.maxBytesPerFile {
		d.writeFileNum++
		d.writePos = 0

		// sync every time we start writing to a new file
		err = d.sync()
		// ...
	}
	return err
}

複製代碼

寫入完消息後,會判斷當前的文件大小是否已經已於maxBytesPerFile若是大,就持久化文件到硬盤,而後從新打開一個新編號文件,進行寫入。this

何時持久化文件到硬盤

調用sync()方法會持久化文件到硬盤,而後從新打開一個新編號文件,進行寫入。
有幾個地方調用會調用這個方法:

  • 一個寫入文件的條數達到了syncEvery的值時,也就是初始化時設置的最大的條數。會調用sync()
  • syncTimeout 初始化時設置的同步時間間隔,若是這個時間間隔到了,而且寫入的文件條數>0的時候,會調用sync()
  • 還有就是上面說過的writeOne方法,寫入完消息後,會判斷當前的文件大小是否已經已於maxBytesPerFile若是大,會調用sync()
  • 當讀取文件時,把整個文件讀取完時,會刪除這個文件而且會把needSync 設置爲trueioLoop 會調用sync()
  • 還有就是Close的時候,會調用sync()
func (d *diskQueue) sync() error {
	if d.writeFile != nil {
		// 把數據 flash到硬盤,關閉文件並設置爲 nil
		err := d.writeFile.Sync()
		if err != nil {
			d.writeFile.Close()
			d.writeFile = nil
			return err
		}
	}
	// 保存元數據信息
	err := d.persistMetaData()
	// ...
	d.needSync = false
	return nil
}
複製代碼

讀取一條數據

元數據保存着 讀取文件的編號 readFileNum 和讀取數據的位置 readPos
而且diskQueue暴露出了一個方法來,經過channel來讀取數據

func (d *diskQueue) ReadChan() chan []byte {
	return d.readChan
}
複製代碼

ioLoop裏,當發現讀取位置小於寫入位置 或者讀文件編號小於寫文件編號,而且下一個讀取位置等於當前位置時纔會讀取一條數據,而後放在一個外部全局變量 dataRead 裏,並把 讀取的channel 賦值監聽 r = d.readChan,當外部有人讀取了消息,則進行moveForward操做

func (d *diskQueue) ioLoop() {
	var dataRead []byte
	var err error
	var count int64
	var r chan []byte
	for {
		// ...
		if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
			if d.nextReadPos == d.readPos {
				dataRead, err = d.readOne()
				if err != nil {
					d.handleReadError()
					continue
				}
			}
			r = d.readChan
		} else {
			r = nil
		}

		select {
		// ...
		case r <- dataRead:
			count++
			// moveForward sets needSync flag if a file is removed
			d.moveForward()
		// ...
		}
	}

// ...
}

複製代碼

readOne 從文件裏讀取一條消息,4個bit的大小,而後讀取具體的消息。若是讀取位置大於最大文件限制,則close。在moveForward裏會進行刪除操做

func (d *diskQueue) readOne() ([]byte, error) {
	var err error
	var msgSize int32
	// 若是readFile是nil,打開一個新的
	if d.readFile == nil {
		curFileName := d.fileName(d.readFileNum)
		d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
		// ...
		d.reader = bufio.NewReader(d.readFile)
	}
	err = binary.Read(d.reader, binary.BigEndian, &msgSize)
	// ...
	readBuf := make([]byte, msgSize)
	_, err = io.ReadFull(d.reader, readBuf)
	totalBytes := int64(4 + msgSize)
	// ...
	d.nextReadPos = d.readPos + totalBytes
	d.nextReadFileNum = d.readFileNum
	// 若是讀取位置大於最大文件限制,則close。在moveForward裏會進行刪除操做
	if d.nextReadPos > d.maxBytesPerFile {
		if d.readFile != nil {
			d.readFile.Close()
			d.readFile = nil
		}
		d.nextReadFileNum++
		d.nextReadPos = 0
	}
	return readBuf, nil
}

複製代碼

moveForward方法會查看讀取的編號,若是發現下一個編號 和當前的編號不一樣時,則刪除舊的文件。

func (d *diskQueue) moveForward() {
	oldReadFileNum := d.readFileNum
	d.readFileNum = d.nextReadFileNum
	d.readPos = d.nextReadPos
	depth := atomic.AddInt64(&d.depth, -1)

	// see if we need to clean up the old file
	if oldReadFileNum != d.nextReadFileNum {
		// sync every time we start reading from a new file
		d.needSync = true

		fn := d.fileName(oldReadFileNum)
		err := os.Remove(fn)
		// ...
	}
	d.checkTailCorruption(depth)

複製代碼
相關文章
相關標籤/搜索