目錄git
NSQ包含3個組件:github
生產者將消息寫入到指定的主題Topic,同一個Topic下則能夠關聯多個管道Channel,每一個Channel都會傳輸對應Topic的完整副本。
消費者則訂閱Channel的消息。因而多個消費者訂閱不一樣的Channel的話,他們各自都能拿到完整的消息副本;但若是多個消費者訂閱同一個Channel,則是共享的,即消息會隨機發送給其中一個消費者。web
接下來咱們來分析下nsq的源碼:sql
nsq各組件均使用上述代碼倉庫,經過apps目錄下的不一樣的main包啓動。好比nsqd的main函數在apps/nsqd目錄下,其餘類同。windows
本文檔主要分析nsqd的主要結構體和方法,及消息生產和消費的過程。主要以TCP api爲例來分析,HTTP/HTTPS的api類同。api
nsqd/nsqd.go文件,NSQD是主實例,一個nsqd進程建立一個nsqd結構體實例,並經過此結構體的Main()方法啓動全部的服務。數組
type NSQD struct { clientIDSequence int64 // 遞增的客戶端ID,每一個客戶端鏈接均從這裏取一個遞增後的ID做爲惟一標識 sync.RWMutex opts atomic.Value // 參數選項,真實類型是apps/nsqd/option.go:Options結構體 dl *dirlock.DirLock isLoading int32 errValue atomic.Value startTime time.Time topicMap map[string]*Topic // 保存當前全部的topic clientLock sync.RWMutex clients map[int64]Client lookupPeers atomic.Value tcpServer *tcpServer tcpListener net.Listener httpListener net.Listener httpsListener net.Listener tlsConfig *tls.Config poolSize int // 當前工做協程組的協程數量 notifyChan chan interface{} optsNotificationChan chan struct{} exitChan chan int waitGroup util.WaitGroupWrapper ci *clusterinfo.ClusterInfo }
主要方法緩存
/* 程序啓動時調用本方法,執行下面的動做: - 啓動TCP/HTTP/HTTPS服務 - 啓動工做協程組:NSQD.queueScanLoop - 啓動服務註冊:NSQD.lookupLoop */ func (n *NSQD) Main() error // 負責管理工做協程組的數量,每調用一次NSQD.queueScanWorker()方法啓動一個工做協程 func (n *NSQD) queueScanLoop() // 由queueScanLoop()調用,負責啓動工做協程組並動態調整協程數量。工做協程的數量爲當前的channel數 * 0.25 func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) // 這是具體的工做協程,監聽workCh,對收到的待處理Channel作兩個動做,一是將超時的消息從新入隊;二是將到時間的延時消息入隊 func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) /* lookupLoop()方法與nsqlookupd創建鏈接,負責向nsqlookupd註冊topic,並定時發送心跳包 */ func (n *NSQD) lookupLoop()
nsqd/tcp.go文件,tcpServer經過Handle()方法接收TCP請求。
tcpServer是nsqd結構的成員,全局也就只有一個實例,但在protocol包的TCPServer方法中,每建立一個新的鏈接,均會調用一次tcpServer.Handle()app
type tcpServer struct { ctx *context conns sync.Map }
主要方法tcp
/* p.nsqd.Main()啓動protocol.TCPServer(),這個方法裏會爲每一個客戶端鏈接建立一個新協程,協程執行tcpServer.Handle()方法 本方法首先對新鏈接讀取4字節校驗版本,新鏈接必須首先發送4字節" V2"。 而後阻塞調用nsqd.protocolV2.IOLoop()處理客戶端接下來的請求。 */ func (p *tcpServer) Handle(clientConn net.Conn)
nsqd/protocol_v2.go文件,protocolV2負責處理過來的具體的用戶請求。
每一個鏈接均會建立一個獨立的protocolV2實例(由tcpServer.Handle()建立)
type protocolV2 struct { ctx *context }
主要方法
/* tcpServer.Handle()阻塞調用本方法 1. 啓用一個獨立協程向消費者推送消息protocolV2.messagePump() 2. for循環接收並處理客戶端請求protocolV2.Exec() */ func (p *protocolV2) IOLoop(conn net.Conn) error // 組裝消息並調用protocolV2.Send()發送給消費者 func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error // 向客戶端發送數據幀 func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error // 解析客戶端請求的指令,調用對應的指令方法 func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) // 負責向消費者推送消息 func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) // 下面這組方法是NSQD支持的指令對應的處理方法 func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) AUTH(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) CLS(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) NOP(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error)
nsqd/client_v2.go文件,保存每一個客戶端的鏈接信息。
clientV2實例由protocolV2.IOLoop()建立,每一個鏈接均有一個獨立的實例。
type clientV2 struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms ReadyCount int64 InFlightCount int64 MessageCount uint64 FinishCount uint64 RequeueCount uint64 pubCounts map[string]uint64 writeLock sync.RWMutex metaLock sync.RWMutex ID int64 ctx *context UserAgent string // original connection net.Conn // connections based on negotiated features tlsConn *tls.Conn flateWriter *flate.Writer // reading/writing interfaces Reader *bufio.Reader Writer *bufio.Writer OutputBufferSize int OutputBufferTimeout time.Duration HeartbeatInterval time.Duration MsgTimeout time.Duration State int32 ConnectTime time.Time Channel *Channel ReadyStateChan chan int ExitChan chan int ClientID string Hostname string SampleRate int32 IdentifyEventChan chan identifyEvent SubEventChan chan *Channel TLS int32 Snappy int32 Deflate int32 // re-usable buffer for reading the 4-byte lengths off the wire lenBuf [4]byte lenSlice []byte AuthSecret string AuthState *auth.State }
nsqd/topic.go文件,對應於每一個topic實例,由系統啓動時建立或者發佈消息/消費消息時自動建立。
type Topic struct { messageCount uint64 // 累計消息數 messageBytes uint64 // 累計消息體的字節數 sync.RWMutex name string // topic名,生產和消費時須要指定此名稱 channelMap map[string]*Channel // 保存每一個channel name和channel指針的映射 backend BackendQueue // 磁盤隊列,當內存memoryMsgChan滿時,寫入硬盤隊列 memoryMsgChan chan *Message // 消息優先存入這個內存chan startChan chan int exitChan chan int channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 idFactory *guidFactory ephemeral bool deleteCallback func(*Topic) deleter sync.Once paused int32 pauseChan chan int ctx *context }
主要方法
/* 下面兩個方法負責將消息寫入topic,底層均調用topic.put()方法 1. topic.memoryMsgChan未滿時,優先寫入內存memoryMsgChan 2. 不然,寫入磁盤topic.backend */ func (t *Topic) PutMessage(m *Message) error func (t *Topic) PutMessages(msgs []*Message) error /* NewTopic建立新的topic時會爲每一個topic啓動一個獨立線程來處理消息推送,即messagePump() 此方法循環隨機從內存memoryMsgChan和磁盤隊列backend中取消息寫入到topic下每個chnnel中 */ func (t *Topic) messagePump()
nsqd/channel.go文件,對應於每一個channel實例
type Channel struct { requeueCount uint64 messageCount uint64 timeoutCount uint64 sync.RWMutex topicName string name string ctx *context backend BackendQueue // 磁盤隊列,當內存memoryMsgChan滿時,寫入硬盤隊列 memoryMsgChan chan *Message // 消息優先存入這個內存chan exitFlag int32 exitMutex sync.RWMutex // state tracking clients map[int64]Consumer paused int32 ephemeral bool deleteCallback func(*Channel) deleter sync.Once // Stats tracking e2eProcessingLatencyStream *quantile.Quantile // TODO: these can be DRYd up deferredMessages map[MessageID]*pqueue.Item // 保存還沒有到時間的延遲消費消息 deferredPQ pqueue.PriorityQueue // 保存還沒有到時間的延遲消費消息,最小堆 deferredMutex sync.Mutex inFlightMessages map[MessageID]*Message // 保存已推送還沒有收到FIN的消息 inFlightPQ inFlightPqueue // 保存已推送還沒有收到FIN的消息,最小堆 inFlightMutex sync.Mutex }
主要方法
/* 將消息寫入channel,邏輯與topic的一致,內存未滿則優先寫內存chan,不然寫入磁盤隊列 */ func (c *Channel) PutMessage(m *Message) error func (c *Channel) put(m *Message) error // 消費超時相關 func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error func (c *Channel) pushInFlightMessage(msg *Message) error func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error) func (c *Channel) addToInFlightPQ(msg *Message) func (c *Channel) removeFromInFlightPQ(msg *Message) func (c *Channel) processInFlightQueue(t int64) bool // 延時消費相關 func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error func (c *Channel) pushDeferredMessage(item *pqueue.Item) error func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error) func (c *Channel) addToDeferredPQ(item *pqueue.Item) func (c *Channel) processDeferredQueue(t int64) bool
nsqd的main函數在apps/nsqd/main.go文件。
啓動時調用了一個第三方包svc,主要做用是攔截syscall.SIGINT/syscall.SIGTERM這兩個信號,最終仍是調用了main.go下的3個方法:
p.nsqd.Main()的邏輯也很簡單,代碼不貼了,依次啓動了TCP服務、HTTP服務、HTTPS服務這3個服務。除此以外,還啓動了如下兩個協程:
TCPServer
protocol包的TCPServer的核心代碼就是下面這幾行,循環等待客戶端鏈接,併爲每一個鏈接建立一個獨立的協程:
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { for { // 等待生產者或消費者鏈接 clientConn, err := listener.Accept() // 每建立一個鏈接wg +1 wg.Add(1) go func() { // 每一個鏈接均啓動一個獨立的協程來接收處理請求 handler.Handle(clientConn) wg.Done() }() } // 等待全部協程退出 wg.Wait() return nil }
TCPServer的核心是爲每一個鏈接啓動的協程處理方法handler.Handle(clientConn),實際調用的是下面這個方法,鏈接創建時先讀取4字節,必須是" V2",而後啓動prot.IOLoop(clientConn)處理接下來的客戶端請求:
func (p *tcpServer) Handle(clientConn net.Conn) { // 不管是生產者仍是消費者,創建鏈接時,必須先發送4字節的" V2"進行版本校驗 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) protocolMagic := string(buf) var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } // 版本校驗經過,保存鏈接信息,key-是ADDR,value是當前鏈接指針 p.conns.Store(clientConn.RemoteAddr(), clientConn) // 啓動 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) } p.conns.Delete(clientConn.RemoteAddr()) }
生產者pub消息時,消息會首先寫入對應topic的隊列(內存優先,內存滿了寫磁盤),topic的messagePump()方法再將消息拷貝給每一個channel。
每一個channel均各執一份完整的消息。
1.消息寫入topic
消息生產由生產者調用PUB/MPUB/DPUB這類指令實現,底層都是調用topic.PutMessage(msg),進一步調用topic.put(msg):
func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: // 優先寫入內存memoryMsgChan default: // 當內存case失敗即memoryMsgChan滿時,走default,將msg以字節形式寫入磁盤隊列topic.backend b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } } return nil }
消息寫入topic的邏輯比較簡單,優先寫memoryMsgChan,若是memoryMsgChan滿了,則寫入磁盤隊列topic.backend。
這裏留個思考題:NSQ是否支持不寫內存,所有寫磁盤隊列?
2.topic將消息複製給每一個channel
第二章介紹結構體和方法時,介紹了topic結構體的messagePump()方法,正是這個方法將第1步寫入的消息複製給每一個channel的:
func (t *Topic) messagePump() { /* 準備工做有代碼咱們略過 */ // 主消息處理循環 for { select { case msg = <-memoryMsgChan: case buf = <-backendChan: msg, err = decodeMessage(buf) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } case <-t.channelUpdateChan: chans = chans[:0] t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.pauseChan: if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.exitChan: goto exit } for i, channel := range chans { chanMsg := msg /* channel消費消息時,須要處理延時/超時等問題,因此這裏複製了消息,給每一個channel傳遞的是獨立的消息實例 */ if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } } }
topic.messagePump()方法代碼還蠻長的,前面是些準備工做,主要就是後面的for循環。其中for循環中select的前兩項,memoryMsgChan來源於topic.memoryMsgChan,而backendChan則是topic.backend.ReadChan(),分別對應於內存和磁盤隊列。注意只有這兩個case會往下傳遞消息,其餘的case處理退出和更新機制的,會continue或exit外層的for循環。
雖然通道channel是有序的,但select的case具備隨機性,這就決定了每輪循環讀的是內存仍是磁盤是隨機的,消息的消費順序是不可控的。
select語句獲取的消息,交給第2層for循環處理,邏輯比較簡單,遍歷每個chan,調用channel.PutMessage()寫入。因爲每一個channel對應於不一樣的消費者,有不一樣的延時/超時和消費機制,因此這裏拷貝了message實例。
每一個鏈接均會啓動一個運行protocolV2.messagePump()方法的協程,這個協程負責監聽channel的消息隊列並向客戶端推送消息。客戶端只有觸發SUB指令以後,纔會將channel傳遞給protocolV2.messagePump(),這以後消費推送纔會正式開啓。
啓動消息推送
前面講Tcpserver時有提到,客戶端建立鏈接時,會調用tcpserver.Handle(),裏面再調用protocolV2.IOLoop()。protocolV2.IOLoop()方法開頭有下面這行:
go p.messagePump(client, messagePumpStartedChan)
這行建立了一個獨立線程,調用的protocolV2.messagePump()負責向消費者推送消息。
有個小細節是不管是生產者仍是消費者,都會建立這個協程,protocolV2.messagePump()建立後並不會當即推送消息,而是須要調用SUB指令,以protocolV2.SUB()方法爲例,方法末尾有這麼一行:
client.SubEventChan <- channel
將當前消費者訂閱的channel傳入client.SubEventChan,這個會由protocolV2.messagePump()接收,這個方法核心是下面這個for循環(限於篇幅,我省略了大量無關代碼):
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { for { if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true } else if flushed { // last iteration we flushed... // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } select { case subChannel = <-subEventChan: // you can't SUB anymore subEventChan = nil case b := <-backendMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg, err := decodeMessage(b) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } }
客戶端創建鏈接初始,subChannel爲空,循環一直走第1個if語句。直到客戶端調用SUB指令,select語句執行"case subChannel = <-subEventChan:",此時subChannel非空,接下來backendMsgChan和memoryMsgChan被賦值,此後開始推送消息:
當多個消費者訂閱同一個channel時狀況會如何?
上面咱們提到消費者發起SUB指令訂閱消息,protocolV2.SUB()會將chan傳給protocolV2.messagePump(),即這一行「client.SubEventChan <- channel」,那麼咱們來看下這個channel變量怎麼來的:
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { ... topic := p.ctx.nsqd.GetTopic(topicName) channel = topic.GetChannel(channelName) ... client.SubEventChan <- channel
SUB方法包含多種邏輯:
因爲是多個消費者共用一個通道chan變量,每一個消費者都有一個for select在循環監聽這個通道,根據chan變量的特性,消費會隨機發送給一位消費者,且一條消息只會推送給一個消費者。
消費超時處理
protocolV2.messagePump()方法,不管是「case b := <-backendMsgChan:」仍是「case msg := <-memoryMsgChan:」,在向消費者推送消息前都調用了下面這行代碼:
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) // 省略其餘代碼 } func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error { msg.pri = now.Add(timeout).UnixNano() // pri成員保存本消息超時時間 err := c.pushInFlightMessage(msg) c.addToInFlightPQ(msg) }
channel.StartInFlightTimeout()將消息保存到channel的inFlightMessages和inFlightPQ隊列中,這兩個緩存是用來處理消費超時的。
值得注意的一個小細節是c.addToInFlightPQ(msg)將msg壓入最小堆時,將msg在數組的偏移量保存到了msg.index成員中(最小堆底層是數組實現)
咱們先簡單看下FIN指令會作啥:
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) { err = client.Channel.FinishMessage(client.ID, *id) // 省略其餘代碼 } func (c *Channel) FinishMessage(clientID int64, id MessageID) error { // 省略其餘代碼 msg, err := c.popInFlightMessage(clientID, id) c.removeFromInFlightPQ(msg) }
FIN的動做比較簡單,主要就是調用channel.FinishMessage()方法把上面寫入超時緩存的msg給刪除掉。
FIN從inFlightMessages中刪除消息比較容易,這是個map,key是msg.id。客戶端發送FIN消息時附帶了msg.id。但如何從最小堆inFlightPQ中刪除對應的msg呢?前面提到在入堆時的一個細節,即保存了msg的偏移量,此時正好用上。經過msg.index直接定位到msg的位置並調整堆便可。
說了這麼多,最小堆的做用是啥?別急,接下來咱們看下超時邏輯:
超時邏輯由程序啓動時開啓的工做線程組來處理,即NSQD.queueScanLoop()方法:
func (n *NSQD) queueScanLoop() { n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: // 定時觸發工做 if len(channels) == 0 { continue } case <-refreshTicker.C: // 動態調整協程組的數量 channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] // 觸發協程組工做 } numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } }
NSQD.queueScanLoop()方法主要有一個for循環,內層是一個select和一個loop循環。select中,第1個定時器case <-workTicker.C的做用是定時觸發工做,只有這個case會跳出select走到下面的loop。第2個定時器負責啓動工做協程組並動態調整協程數量,咱們來看下第2個定時器調用的resizePool()方法:
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) // 協程數量設定爲channel數的1/4 if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { // 當協程數量達到協程數量設定爲channel數的1/4時,退出 break } else if idealPoolSize < n.poolSize { // 不然若是當前協程數大於目標值,則經過closeCh通知部分協程退出 // contract closeCh <- 1 n.poolSize-- } else { // 不然協程數不夠,則啓動新的協程 // expand n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } } }
resizePool()方法上面的註釋已經說的很清楚了,做用就是保持工做協程數量爲當前channel數的1/4。
接下來咱們看具體的工做邏輯,queueScanWorker()方法:
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: now := time.Now().UnixNano() dirty := false if c.processInFlightQueue(now) { dirty = true } if c.processDeferredQueue(now) { dirty = true } responseCh <- dirty case <-closeCh: return } } }
queueScanWorker()方法的代碼很短,一是監聽closeCh的退出信號;二是監聽workCh的工做信號。workCh會將須要處理的channel傳入,而後調用processInFlightQueue()清理超時的消息,調用processDeferredQueue()清理到時間的延時消息:
func (c *Channel) processInFlightQueue(t int64) bool { dirty := false for { c.inFlightMutex.Lock() msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true _, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } c.put(msg) } exit: return dirty } func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) { if len(*pq) == 0 { return nil, 0 } x := (*pq)[0] if x.pri > max { return nil, x.pri - max } pq.Pop() return x, 0 }
前面提到msg.pri成員保存本消息超時時間,因此PeekAndShift()返回的是最小堆裏已經超時且超時時間最長的那條消息。processInFlightQueue()則將消息從超時隊列中刪,同時將消息從新put進channel。注意此時超時的消息put進channel後實際是排在隊尾的,消費順序將發生改變。
processInFlightQueue()方法若是存在超時消息,返回值dirty標識true。queueScanWorker()將dirty寫入responseCh。再往回看,queueScanLoop()方法統計了dirty的數量,超過必定比例會繼續執行loop,而不是等待下一次定時執行。
生產者調用DPUB發佈的消息,能夠指定延時多少再推送給消費者。
咱們來看下DPUB的邏輯:
func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { timeoutMs, err := protocol.ByteToBase10(params[2]) timeoutDuration := time.Duration(timeoutMs) * time.Millisecond msg := NewMessage(topic.GenerateID(), messageBody) msg.deferred = timeoutDuration err = topic.PutMessage(msg) }
從上面截取的PUB()方法代碼能夠看出,DPUB的消息會將延時時間寫入msg.deferred成員。4.1章節第2部分介紹的Topic.messagePump()方法有下面這段:
func (t *Topic) messagePump() { if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } }
當chanMsg.deferred != 0時表示延時消息,此時不是直接調用putMessage()方法寫入channel,而是調用channel.PutMessageDeferred(chanMsg, chanMsg.deferred),消息被寫入了延時隊列Channel.deferredMessages和Channel.deferredPQ。以後的邏輯是在工做協程組NSQD.queueScanLoop()中被識別並put進channel,這與超時的處理邏輯是同樣的,不展開說。