上兩篇帖子主要說了一下nsq的拓撲結構,如何進行故障處理和橫向擴展,保證了客戶端和服務端的長鏈接,鏈接保持了,就要傳輸數據了,nsq
如何保證消息被訂閱者消費,如何保證消息不丟失,就是今天要闡述的內容。
html
nsq
topic、channel、和消費我客戶端的結構如上圖,一個topic
下有多個channel
每一個channel
能夠被多個客戶端訂閱。
消息處理的大概流程:當一個消息被nsq
接收後,傳給相應的topic
,topic
把消息傳遞給全部的channel
,channel
根據算法選擇一個訂閱客戶端,把消息發送給客戶端進行處理。
看上去這個流程是沒有問題的,咱們來思考幾個問題git
nsq
服務端從新啓動時消息不丟失;以前的帖子說過客戶端和服務端進行鏈接後,會啓動一個gorouting
來發送信息給客戶端github
go p.messagePump(client, messagePumpStartedChan)
而後會監聽客戶端發過來的命令client.Reader.ReadSlice('\n')
服務端會定時檢查client端的鏈接狀態,讀取客戶端發過來的各類命令,發送心跳等。每個鏈接最終的目的就是監聽channel
的消息,發送給客戶端進行消費。
當有消息發送給訂閱客戶端的時候,固然選擇哪一個client
也是有無則的,這個之後講,redis
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { // ... for { // ... case b := <-backendMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg, err := decodeMessage(b) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } // ... }
看一下這個方法調用subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
,在發送給客戶端以前,把這個消息設置爲在飛翔中,算法
// pushInFlightMessage atomically adds a message to the in-flight dictionary func (c *Channel) pushInFlightMessage(msg *Message) error { c.inFlightMutex.Lock() _, ok := c.inFlightMessages[msg.ID] if ok { c.inFlightMutex.Unlock() return errors.New("ID already in flight") } c.inFlightMessages[msg.ID] = msg c.inFlightMutex.Unlock() return nil }
而後發送給客戶端進行處理。
在發送中的數據,存在的各類不肯定性,nsq
的處理方式是:對發送給客戶端信息設置爲在飛翔中,若是在若是處理成功就把這個消息從飛翔中的狀態中去掉,若是在規定的時間內沒有收到客戶端的反饋,則認爲這個消息超時,而後從新歸隊,兩次進行處理。因此不管是哪一種特殊狀況,nsq
統一認爲消息爲超時。服務器
nsq
對超時消息的處理,借鑑了redis
的過時算法,但也不太同樣redis
的更復雜一些,由於redis是單線程的,還要處理佔用cpu
時間等等,nsq
由於gorouting
的存在要很簡單不少。
簡單來講,就是在nsq
啓動的時候啓動協程去處理channel的過時數據網絡
func (n *NSQD) Main() error { // ... // 啓動協程去處理channel的過時數據 n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } err := <-exitCh return err }
固然不是每個channel啓動一個協程來處理過時數據,而是有一些規定,咱們看一下一些默認值,而後再展開講算法oop
return &Options{ // ... HTTPClientConnectTimeout: 2 * time.Second, HTTPClientRequestTimeout: 5 * time.Second, // 內存最大隊列數 MemQueueSize: 10000, MaxBytesPerFile: 100 * 1024 * 1024, SyncEvery: 2500, SyncTimeout: 2 * time.Second, // 掃描channel的時間間隔 QueueScanInterval: 100 * time.Millisecond, // 刷新掃描的時間間隔 QueueScanRefreshInterval: 5 * time.Second, QueueScanSelectionCount: 20, // 最大的掃描池數量 QueueScanWorkerPoolMax: 4, // 標識百分比 QueueScanDirtyPercent: 0.25, // 消息超時 MsgTimeout: 60 * time.Second, MaxMsgTimeout: 15 * time.Minute, MaxMsgSize: 1024 * 1024, MaxBodySize: 5 * 1024 * 1024, MaxReqTimeout: 1 * time.Hour, ClientTimeout: 60 * time.Second, // ... }
這些參數均可以在啓動nsq
的時候根據本身須要來指定,咱們主要說一下這幾個:atom
QueueScanWorkerPoolMax
就是最大協程數,默認是4
,這個數是掃描全部channel的最大協程數,固然channel
的數量小於這個參數的話,就調整協程的數量,以最小的爲準,好比channel
的數量爲2
個,而默認的是4個,那就調掃描的數量爲2
個QueueScanSelectionCount
每次掃描最大的channel
數量,默認是20
,若是channel
的數量小於這個值,則以channel
的數量爲準。QueueScanDirtyPercent
標識髒數據 channel
的百分比,默認爲0.25
,eg: channel
數量爲10
,則一次最多掃描10
個,查看每一個channel
是否有過時的數據,若是有,則標記爲這個channel是有髒數據的,若是有髒數據的channel的數量 佔此次掃描的10
個channel的比例超過這個百分比,則直接再次進行掃描一次,而不用等到下一次時間點。QueueScanInterval
掃描channel的時間間隔,默認的是每100毫秒掃描一次。QueueScanRefreshInterval
刷新掃描的時間間隔 目前的處理方式是調整channel的協程數量。nsq
處理過時數據的算法,總結一下就是,使用協程定時去掃描隨機的channel
裏是否有過時數據。func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.getOpts().QueueScanInterval) refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: // 隨機channel for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] } numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } exit: n.logf(LOG_INFO, "QUEUESCAN: closing") close(closeCh) workTicker.Stop() refreshTicker.Stop() }
在掃描channel
的時候,若是發現有過時數據後,會從新放回到隊列,進行重發
操做。線程
func (c *Channel) processInFlightQueue(t int64) bool { // ... for { c.inFlightMutex.Lock() msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true _, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } //從新放回隊列進行消費處理。 c.put(msg) } exit: return dirty }
以前的帖子中的例子中有說過,客戶端要消費消息,須要實現接口
type Handler interface { HandleMessage(message *Message) error }
在服務端發送消息給客戶端後,若是在處理業務邏輯時,若是發生錯誤則給服務器發送Requeue
命令告訴服務器,從新發送消息進處理。若是處理成功,則發送Finish
命令
func (r *Consumer) handlerLoop(handler Handler) { r.log(LogLevelDebug, "starting Handler") for { message, ok := <-r.incomingMessages if !ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } err := handler.HandleMessage(message) if err != nil { r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID) if !message.IsAutoResponseDisabled() { message.Requeue(-1) } continue } if !message.IsAutoResponseDisabled() { message.Finish() } } exit: r.log(LogLevelDebug, "stopping Handler") if atomic.AddInt32(&r.runningHandlers, -1) == 0 { r.exit() } }
服務端收到命令後,對飛翔中的消息進行處理,若是成功則去掉,若是是Requeue
則執行歸隊和重發操做,或者進行defer隊列處理。
默認的狀況下,只有內存隊列不足時MemQueueSize:10000
時,纔會把數據保存到文件內進行持久到硬盤。
select { case c.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, c.backend) bufferPoolPut(b) c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", c.name, err) return err } } return nil
若是將 --mem-queue-size 設置爲 0,全部的消息將會存儲到磁盤。咱們不用擔憂消息會丟失,nsq 內部機制保證在程序關閉時將隊列中的數據持久化到硬盤,重啓後就會恢復。
nsq
本身開發了一個庫go-diskqueue來持久會消息到內存。這個庫的代碼量很少,理解起來也不難,代碼邏輯我想下一篇再講。
看一下保存在硬盤後的樣子: