nsqd爲nsq daemon的簡寫,是nsq組件最主要的服務。
nsqd提供一個tcp服務、一個http服務以及一個可選的https服務,tcp服務於客戶端(生產者或消費者),http則提供API(可用於建立、刪除topic與channel,生產數據,清空數據等)。算法
nsqd的啓動入口爲apps/nsqd/nsqd.go文件裏的main函數。
首先定義了一個program的結構體,用於對程序的控制。結構體內元素爲指向NSQD的指針。
main函數裏面定義了一個具體的prg,而後Run它。
Run函數負責啓動prg並阻塞,直至接收到對應的信號(對於nsqd爲SIGINT或者SIGTERM信號)。緩存
type program struct { nsqd *nsqd.NSQD } func main() { //定義一個具體的prg,並啓動它 prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { log.Fatal(err) } }
Run函數會調用program的Start方法,並調用Main()來啓動nsqd服務。併發
func (p *program) Start() error { opts := nsqd.NewOptions() ... //配置讀取過程,會修改opt nsqd := nsqd.New(opts) ... //Metadata的處理,之後再說 nsqd.Main() //啓動nsqd服務 p.nsqd = nsqd return nil }
nsqd首先啓動一個Tcp服務、一個Http服務以及一個可選的Https服務,而後調用queueScanLoop函數來處理in-flight與defered數據。app
func (n *NSQD) Main() { var httpListener net.Listener var httpsListener net.Listener ctx := &context{n} //連續啓動Tcp、Https、Http服務 tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress) ... if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig) ... } httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) ... n.waitGroup.Wrap(func() { n.queueScanLoop() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) } }
客戶端鏈接nsqd的tcp server之後,nsqd會啓動一個IOLoop,IOLoop裏面首先啓動messagePump,而後啓動循環處理後續請求。
messagePump負責將Channel裏面的消息取出來,並push給客戶端。tcp
func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) // messagePump初始化會用到client的一些參數,這裏的messagePumpStartedChan保證了初始化完成之後纔會接收新的請求,避免了IDENTIFY請求對client的參數可能進行的修改。 messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan for { ... //讀取下一次請求 line, err = client.Reader.ReadSlice('\n') ... params := bytes.Split(line, separatorBytes) p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) //處理請求 response, err = p.Exec(client, params) ... if response != nil { //發送響應 err = p.Send(client, frameTypeResponse, response) ... } } ... }
調用http的"/topic/create"接口、"/pub"接口,tcp的SUB/PUB請求等都會觸發topic的建立。
建立topic位於NSQD的GetTopic方法。
首先使用讀鎖判斷topic是否存在,若是存在則直接返回;若是不存在,則加寫鎖,而後調用NewTopic函數建立新的topic。ide
func (n *NSQD) GetTopic(topicName string) *Topic { //讀鎖判斷topic是否存在,若是存在則直接返回 // most likely, we already have this topic, so try read lock first. n.RLock() t, ok := n.topicMap[topicName] n.RUnlock() if ok { return t } n.Lock() //獲取寫鎖後再次判斷topic是否存在,若是存在則直接返回 t, ok = n.topicMap[topicName] if ok { n.Unlock() return t } deleteCallback := func(t *Topic) { n.DeleteExistingTopic(t.name) } //建立topic t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t n.logf(LOG_INFO, "TOPIC(%s): created", t.name) // release our global nsqd lock, and switch to a more granular topic lock while we init our // channels from lookupd. This blocks concurrent PutMessages to this topic. t.Lock() n.Unlock() // 使用lookup的相關處理,若是不使用能夠先忽略 ... t.Unlock() // 觸發messagePump更新channel狀態 select { case t.channelUpdateChan <- 1: case <-t.exitChan: } return t }
調用http的"/pub"接口,或者tcp的PUB操做,均可以將消息發送給nsqd,nsqd首先將消息存入topic中做爲過渡。
兩種處理過程分別在(s httpServer)的doPUB方法與(p protocolV2)的PUB方法,兩者異曲同工,都會調用(t *Topic)的PutMessage方法,將消息寫入topic中。函數
func (t *Topic) PutMessage(m *Message) error { ... succ, err := t.put(m) ... } func (t *Topic) put(m *Message) (succ bool, err error) { if t.putMode == PUTMODE_NORMAL{ select { //將消息寫入到Topic的memoryMsgChan case t.memoryMsgChan <- m: default: t.put2Disk(m) } }else{ ... } }
消費者的「訂閱」(SUB)請求會觸發channel的建立,在tcp服務的SUB處理裏面。oop
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { ... topicName := string(params[1]) ... channelName := string(params[2]) ... // 防止該topic或者channel是正處於退出狀態 var channel *Channel for { //獲取或者建立topic topic := p.ctx.nsqd.GetTopic(topicName) //獲取或者建立channel channel = topic.GetChannel(channelName) channel.AddClient(client.ID, client) if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) { channel.RemoveClient(client.ID) time.Sleep(1 * time.Millisecond) continue } break } atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // 將channel告知client client.SubEventChan <- channel return okBytes, nil }
topic建立時調用的NewTopic函數會啓動messagePump函數,負責更新channel,並將topic中的消息複製到全部channel。this
// messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic func (t *Topic) messagePump() { //獲取全部channel t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) > 0 { memoryMsgChan = t.memoryMsgChan limitedMsgChan = t.limitedMsgChan backendChan = t.backend.ReadChan() } for { select { //接收消息 case msg = <-memoryMsgChan: if msg == nil { continue } ... //更新channel case <-t.channelUpdateChan: chans = chans[:0] t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { limitedMsgChan = t.limitedMsgChan memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue ... case <-t.exitChan: goto exit } if msg == nil { continue } //將消息發送到全部channel for i, channel := range chans { ... chanMsg := msg //爲每個channel單獨複製一份數據 if i > 0 { chanMsg = NewMessage(msg.ID,msg.MsgType, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } ... //將消息存儲到channel //此處的PutMessage與Topic的同名方法相似,也是將消息寫到channel的memoryMsgChan err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } } exit: t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) } //PutMessage會調用到c.put(msg) func (c *Channel) put(m *Message) (succ bool, err error) { succ = true if c.putMode == PUTMODE_NORMAL { select { //將數據發送到Channel的memoryMsgChan case c.memoryMsgChan <- m: default: ... } }else{ ... } }
回憶下前面介紹的兩點:一是客戶端鏈接時,會啓動messagePump負責將Channel裏面的消息取出來,並push給客戶端;二是channel建立時,會將建立的channel告知client。
messagePump獲取建立的這個channel,並從channel的memoryMsgChan接收消息,而後push給消費者。atom
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { ... //channel建立成功後,經過SubEventChan告知client subEventChan := client.SubEventChan for { if subChannel == nil || !client.IsReadyForMessages() { ... } else if flushed { ... } else { // 獲取channel的memoryMsgChan memoryMsgChan = subChannel.memoryMsgChan ... } select { ... case subChannel = <-subEventChan: p.ctx.nsqd.logf(LOG_INFO, "get subEventChan:%+v", subChannel) // you can't SUB anymore subEventChan = nil ... //從memoryMsgChan裏接收消息,並push給客戶端 case msg := <-memoryMsgChan: if msg == nil{ continue } if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ //將消息放入in-flight隊列 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() p.ctx.nsqd.logf(LOG_INFO, "get memory msg:%+v", msg) //將消息push給消費者 err = p.SendMessage(client, msg, &buf) if err != nil { goto exit } flushed = false ... case <-client.ExitChan: goto exit } } exit: ... }
回一下前面講到兩點:一是queueScanLoop函數會處理in-flight數據與deferred數據;二是消息push給消費者以前會調用StartInFlightTimeout將該消息放入in-flight隊列。
queueScanLoop管理一個queueScanWorker pool(默認大小爲4),各個worker併發處理channel數據。
in-flight數據存儲時會記錄下該消息的到期時間,以便到期後將該消息從新push給消費者。
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error { now := time.Now() msg.clientID = clientID msg.deliveryTS = now //存儲到期時間 msg.pri = now.Add(timeout).UnixNano() err := c.pushInFlightMessage(msg) if err != nil { return err } c.addToInFlightPQ(msg) return nil }
若是消費者成功接收,則會迴應一個"FIN",nsqd收到"FIN"則將該消息從in-flight隊列中清除。
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) { ... id, err := getMessageID(params[1]) if err != nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error()) } //將該消息從in-flight隊列中清除 err = client.Channel.FinishMessage(client.ID, *id) if err != nil { return nil, protocol.NewClientErr(err, "E_FIN_FAILED", fmt.Sprintf("FIN %s failed %s", *id, err.Error())) } client.FinishedMessage() return nil, nil }
若是消費者收到消息之後,若是一時間本身處理不過來,能夠經過"REQ"將該消息從新入隊,並能夠設定多長時間後從新消費,時間爲0的話則當即消費,不然延遲消費。
延遲消費的處理方式與in-flight數據相似,也是先寫入到一個隊列,並設定到期時間,等待從新讀取。
下面介紹這兩部分數據時如何從新消費的,主要是queueScanLoop的處理邏輯。
worker的建立與銷燬是在resizePool函數。
worker的完美個數爲channel總數的四分之一,可是不能大於QueueScanWorkerPoolMax。
1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
全部的worker都會監聽同一個workCh、closeCh,若是worker過多,則只須要向closeCh寫入一個「通知」,收到這個「通知」的worker就會被銷燬。
一次for循環只建立或銷燬一個worker,直至worker數目達到idealPoolSize。
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { break } else if idealPoolSize < n.poolSize { // contract closeCh <- 1 n.poolSize-- } else { // expand n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } } }
queueScanLoop的處理方法模仿了Redis的機率到期算法(probabilistic expiration algorithm):每過一個QueueScanInterval(默認100ms)間隔,進行一次機率選擇,從全部的channel緩存中隨機選擇QueueScanSelectionCount(默認20)個channel,若是某個被選中channel的任何一個queue有事可作,則認爲該channel爲「髒」channel。若是被選中channel中「髒」channel的比例大於QueueScanDirtyPercent(默認25%),則不投入睡眠,直接進行下一次機率選擇。
channel緩存每QueueScanRefreshInterval(默認5s)刷新一次。
queueScanLoop與worker之間經過workCh與responseCh來進行交互。
func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.getOpts().QueueScanInterval) refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) //根據channel數目,建立worker channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: //更新channel緩存,並據此建立或者銷燬worker channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } //workTicker到期,且channels長度不爲0時,會走到這裏。 num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: //隨機選擇num個channel,並傳入workCh for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] } //等待這num個channel的處理結果(是否爲「髒」channel) numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } //若是「髒」channel達到必定比例,直接進行下次處理 if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } exit: n.logf(LOG_INFO, "QUEUESCAN: closing") close(closeCh) workTicker.Stop() refreshTicker.Stop() }
worker從queueScanLoop接收須要處理的channel,處理該channel的in-flight數據與deferred數據。processInFlightQueue與processDeferredQueue函數都會調用c.put(msg),將數據發送到Channel的memoryMsgChan,進而從新被push到消費者。
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: now := time.Now().UnixNano() dirty := false //處理in-flight消息 if c.processInFlightQueue(now) { dirty = true } //處理defered消息 if c.processDeferredQueue(now) { dirty = true } responseCh <- dirty case <-closeCh: return } } }
func (c *Channel) processInFlightQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() if c.Exiting() { return false } dirty := false for { c.inFlightMutex.Lock() //獲取超時的消息 msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true //判斷該消息是否屬於這個client _, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } //將消息從新寫入channel c.put(msg) } exit: return dirty }
processDeferredQueue的處理與此相似。