nsq裏面queueScanLoop函數負責處理延遲消息算法
算法原型是借鑑Redis probabilistic expiration algorithmdom
It copies Redis's probabilistic expiration algorithm: it wakes up every // QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount // (default: 20) channels from a locally cached list (refreshed every // QueueScanRefreshInterval (default: 5s))ide
If either of the queues had work to do the channel is considered "dirty".函數
If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty, the loop continues without sleepoop
看看代碼具體實現atom
func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.getOpts().QueueScanInterval)//默認是100ms refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) //默認是5s channels := n.channels() //從新調整Pool的大小 n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: //從新調整Pool的大小 channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: //速記選取channel希爾workCh裏面去 for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] } //判斷dirty的個數 numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } //dirty的個數佔總數的百分比 if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } exit: n.logf(LOG_INFO, "QUEUESCAN: closing") close(closeCh) //關閉兩個channel workTicker.Stop() refreshTicker.Stop() }
看看怎麼進行的resizePoolidea
// resizePool adjusts the size of the pool of queueScanWorker goroutines // // 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax) // //傳入的num參數是len(channels) func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { break } else if idealPoolSize < n.poolSize { // poolSize過大就縮小 //直接寫入closeCh,work就關閉了 closeCh <- 1 n.poolSize-- } else { // poolSize太小就擴張 n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } } } func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: now := time.Now().UnixNano() dirty := false //已經發送的消息處理 if c.processInFlightQueue(now) { dirty = true } //deferred延遲發送消息處理 if c.processDeferredQueue(now) { dirty = true } responseCh <- dirty case <-closeCh: //寫入closeCh,則關閉了worker return } } }
最後看看processInFlightQueue和processDeferredQueue兩個操做code
func (c *Channel) processDeferredQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() if c.Exiting() { return false } dirty := false for { c.deferredMutex.Lock() //最小堆裏面移出知足優先級的元素,優先級爲時間戳 item, _ := c.deferredPQ.PeekAndShift(t) c.deferredMutex.Unlock() //沒有元素則dirty返回false,work至關於沒有作處理 if item == nil { goto exit } //pop出延遲的消息,發送消息,dirty設置爲true dirty = true msg := item.Value.(*Message) _, err := c.popDeferredMessage(msg.ID) if err != nil { goto exit } c.put(msg) } exit: return dirty } func (c *Channel) processInFlightQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() if c.Exiting() { return false } dirty := false for { c.inFlightMutex.Lock() msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true _, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() //須要經過msg找出對應發送給的client client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } c.put(msg) } exit: return dirty }