nsq裏面queueScanLoop實現原理

nsq裏面queueScanLoop函數負責處理延遲消息算法

算法原型是借鑑Redis probabilistic expiration algorithmdom

 It copies Redis's probabilistic expiration algorithm: it wakes up every // QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount // (default: 20) channels from a locally cached list (refreshed every // QueueScanRefreshInterval (default: 5s))ide

If either of the queues had work to do the channel is considered "dirty".函數

If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty, the loop continues without sleepoop

看看代碼具體實現atom

func (n *NSQD) queueScanLoop() {
	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
	closeCh := make(chan int)

	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)//默認是100ms
	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) //默認是5s

	channels := n.channels()
    //從新調整Pool的大小
	n.resizePool(len(channels), workCh, responseCh, closeCh)

	for {
		select {
		case <-workTicker.C:
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C:
            //從新調整Pool的大小
			channels = n.channels()
			n.resizePool(len(channels), workCh, responseCh, closeCh)
			continue
		case <-n.exitChan:
			goto exit
		}

		num := n.getOpts().QueueScanSelectionCount
		if num > len(channels) {
			num = len(channels)
		}

	loop:
        //速記選取channel希爾workCh裏面去
		for _, i := range util.UniqRands(num, len(channels)) {
			workCh <- channels[i]
		}
        
        //判斷dirty的個數
		numDirty := 0
		for i := 0; i < num; i++ {
			if <-responseCh {
				numDirty++
			}
		}
        
        //dirty的個數佔總數的百分比
		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
			goto loop
		}
	}

exit:
	n.logf(LOG_INFO, "QUEUESCAN: closing")
	close(closeCh)
    //關閉兩個channel
	workTicker.Stop()
	refreshTicker.Stop()
}

看看怎麼進行的resizePoolidea

// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 	1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//

//傳入的num參數是len(channels)
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	idealPoolSize := int(float64(num) * 0.25)
	if idealPoolSize < 1 {
		idealPoolSize = 1
	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
	}
	for {
		if idealPoolSize == n.poolSize {
			break
		} else if idealPoolSize < n.poolSize {
			// poolSize過大就縮小
            //直接寫入closeCh,work就關閉了
			closeCh <- 1
			n.poolSize--
		} else {
			// poolSize太小就擴張
			n.waitGroup.Wrap(func() {
				n.queueScanWorker(workCh, responseCh, closeCh)
			})
			n.poolSize++
		}
	}
}

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	for {
		select {
		case c := <-workCh:
			now := time.Now().UnixNano()
			dirty := false
            //已經發送的消息處理
			if c.processInFlightQueue(now) {
				dirty = true
			}
            //deferred延遲發送消息處理
			if c.processDeferredQueue(now) {
				dirty = true
			}
			responseCh <- dirty
		case <-closeCh:
            //寫入closeCh,則關閉了worker
			return
		}
	}
}

最後看看processInFlightQueue和processDeferredQueue兩個操做code

func (c *Channel) processDeferredQueue(t int64) bool {
	c.exitMutex.RLock()
	defer c.exitMutex.RUnlock()

	if c.Exiting() {
		return false
	}

	dirty := false
	for {
		c.deferredMutex.Lock()
        //最小堆裏面移出知足優先級的元素,優先級爲時間戳
		item, _ := c.deferredPQ.PeekAndShift(t)
		c.deferredMutex.Unlock()
         
        //沒有元素則dirty返回false,work至關於沒有作處理
		if item == nil {
			goto exit
		}

        //pop出延遲的消息,發送消息,dirty設置爲true
		dirty = true

		msg := item.Value.(*Message)
		_, err := c.popDeferredMessage(msg.ID)
		if err != nil {
			goto exit
		}
		c.put(msg)
	}

exit:
	return dirty
}

func (c *Channel) processInFlightQueue(t int64) bool {
	c.exitMutex.RLock()
	defer c.exitMutex.RUnlock()

	if c.Exiting() {
		return false
	}

	dirty := false
	for {
		c.inFlightMutex.Lock()
		msg, _ := c.inFlightPQ.PeekAndShift(t)
		c.inFlightMutex.Unlock()

		if msg == nil {
			goto exit
		}
		dirty = true

		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
		if err != nil {
			goto exit
		}
		atomic.AddUint64(&c.timeoutCount, 1)
		c.RLock()
        //須要經過msg找出對應發送給的client
		client, ok := c.clients[msg.clientID]
		c.RUnlock()
		if ok {
			client.TimedOutMessage()
		}
		c.put(msg)
	}

exit:
	return dirty
}
相關文章
相關標籤/搜索