【源碼閱讀】Nsqd

Nsqd源碼閱讀

簡介

nsqd爲nsq daemon的簡寫,是nsq組件最主要的服務。
nsqd提供一個tcp服務、一個http服務以及一個可選的https服務,tcp服務於客戶端(生產者或消費者),http則提供API(可用於建立、刪除topic與channel,生產數據,清空數據等)。算法

初始化

nsqd的啓動入口爲apps/nsqd/nsqd.go文件裏的main函數。
首先定義了一個program的結構體,用於對程序的控制。結構體內元素爲指向NSQD的指針。
main函數裏面定義了一個具體的prg,而後Run它。
Run函數負責啓動prg並阻塞,直至接收到對應的信號(對於nsqd爲SIGINT或者SIGTERM信號)。緩存

type program struct {  
   nsqd *nsqd.NSQD  
}

func main() {
   //定義一個具體的prg,並啓動它
   prg := &program{}  
   if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {  
      log.Fatal(err)  
   }  
}

Run函數會調用program的Start方法,並調用Main()來啓動nsqd服務。併發

func (p *program) Start() error {
    opts := nsqd.NewOptions()
    
    ... //配置讀取過程,會修改opt
    
    nsqd := nsqd.New(opts)
    
    ... //Metadata的處理,之後再說
    
    nsqd.Main() //啓動nsqd服務

    p.nsqd = nsqd
    return nil
}

啓動

nsqd首先啓動一個Tcp服務、一個Http服務以及一個可選的Https服務,而後調用queueScanLoop函數來處理in-flight與defered數據。app

func (n *NSQD) Main() {
    var httpListener net.Listener
    var httpsListener net.Listener

    ctx := &context{n}

    //連續啓動Tcp、Https、Http服務
    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    ...

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        ...
    }
    
    httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    ...

    n.waitGroup.Wrap(func() { n.queueScanLoop() })
    n.waitGroup.Wrap(func() { n.lookupLoop() })
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }
}

客戶端鏈接

客戶端鏈接nsqd的tcp server之後,nsqd會啓動一個IOLoop,IOLoop裏面首先啓動messagePump,而後啓動循環處理後續請求。
messagePump負責將Channel裏面的消息取出來,並push給客戶端。tcp

func (p *protocolV2) IOLoop(conn net.Conn) error {
    var err error
    var line []byte
    var zeroTime time.Time

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)

    // messagePump初始化會用到client的一些參數,這裏的messagePumpStartedChan保證了初始化完成之後纔會接收新的請求,避免了IDENTIFY請求對client的參數可能進行的修改。
    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan
    
    for {
        ...
        //讀取下一次請求
        line, err = client.Reader.ReadSlice('\n')
        ...
        params := bytes.Split(line, separatorBytes)

        p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
        
        //處理請求
        response, err = p.Exec(client, params)
        ...
        if response != nil {
            //發送響應
            err = p.Send(client, frameTypeResponse, response)
            ...
        }
    }
    ...
}

數據生產

topic建立

調用http的"/topic/create"接口、"/pub"接口,tcp的SUB/PUB請求等都會觸發topic的建立。
建立topic位於NSQD的GetTopic方法。
首先使用讀鎖判斷topic是否存在,若是存在則直接返回;若是不存在,則加寫鎖,而後調用NewTopic函數建立新的topic。ide

func (n *NSQD) GetTopic(topicName string) *Topic {
    //讀鎖判斷topic是否存在,若是存在則直接返回
    // most likely, we already have this topic, so try read lock first.
    n.RLock()
    t, ok := n.topicMap[topicName]
    n.RUnlock()
    if ok {
        return t
    }

    n.Lock()
    //獲取寫鎖後再次判斷topic是否存在,若是存在則直接返回
    t, ok = n.topicMap[topicName]
    if ok {
        n.Unlock()
        return t
    }
    deleteCallback := func(t *Topic) {
        n.DeleteExistingTopic(t.name)
    }
    //建立topic
    t = NewTopic(topicName, &context{n}, deleteCallback)
    n.topicMap[topicName] = t

    n.logf(LOG_INFO, "TOPIC(%s): created", t.name)

    // release our global nsqd lock, and switch to a more granular topic lock while we init our
    // channels from lookupd. This blocks concurrent PutMessages to this topic.
    t.Lock()
    n.Unlock()

    // 使用lookup的相關處理,若是不使用能夠先忽略
    ...

    t.Unlock()

    // 觸發messagePump更新channel狀態
    select {
    case t.channelUpdateChan <- 1:
    case <-t.exitChan:
    }
    return t
}

生產消息

調用http的"/pub"接口,或者tcp的PUB操做,均可以將消息發送給nsqd,nsqd首先將消息存入topic中做爲過渡。
兩種處理過程分別在(s httpServer)的doPUB方法與(p protocolV2)的PUB方法,兩者異曲同工,都會調用(t *Topic)的PutMessage方法,將消息寫入topic中。函數

func (t *Topic) PutMessage(m *Message) error {
    ...
    succ, err := t.put(m)
    ...
}

func (t *Topic) put(m *Message) (succ bool, err error) {
    if t.putMode == PUTMODE_NORMAL{
        select {
        //將消息寫入到Topic的memoryMsgChan
        case t.memoryMsgChan <- m:
        default:
            t.put2Disk(m)
        }
    }else{
    ...
    }
}

數據消費

channel建立

消費者的「訂閱」(SUB)請求會觸發channel的建立,在tcp服務的SUB處理裏面。oop

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    ...

    topicName := string(params[1])
    ...

    channelName := string(params[2])
    ...

    // 防止該topic或者channel是正處於退出狀態
    var channel *Channel
    for {
        //獲取或者建立topic
        topic := p.ctx.nsqd.GetTopic(topicName)
        //獲取或者建立channel
        channel = topic.GetChannel(channelName)
        channel.AddClient(client.ID, client)

        if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {
            channel.RemoveClient(client.ID)
            time.Sleep(1 * time.Millisecond)
            continue
        }
        break
    }
    atomic.StoreInt32(&client.State, stateSubscribed)
    client.Channel = channel
    // 將channel告知client
    client.SubEventChan <- channel

    return okBytes, nil
}

topic的消息複製到channel

topic建立時調用的NewTopic函數會啓動messagePump函數,負責更新channel,並將topic中的消息複製到全部channel。this

// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
    //獲取全部channel
    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()

    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        limitedMsgChan = t.limitedMsgChan
        backendChan = t.backend.ReadChan()
    }

    for {
        select {
        //接收消息
        case msg = <-memoryMsgChan:
            if msg == nil {
                continue
            }
        ...
        //更新channel
        case <-t.channelUpdateChan:
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                limitedMsgChan = t.limitedMsgChan
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        ...
        case <-t.exitChan:
            goto exit
        }

        if msg == nil {
            continue
        }

        //將消息發送到全部channel
        for i, channel := range chans {
            ...
            chanMsg := msg
            //爲每個channel單獨複製一份數據
            if i > 0 {
                chanMsg = NewMessage(msg.ID,msg.MsgType, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            ...
            //將消息存儲到channel
            //此處的PutMessage與Topic的同名方法相似,也是將消息寫到channel的memoryMsgChan
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }
    }

exit:
    t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}

//PutMessage會調用到c.put(msg)
func (c *Channel) put(m *Message) (succ bool, err error) {
    succ = true
    if c.putMode == PUTMODE_NORMAL {
        select {
        //將數據發送到Channel的memoryMsgChan
        case c.memoryMsgChan <- m:
        default:
            ...
        }
    }else{
    ...
    }
}

數據push到消費者

回憶下前面介紹的兩點:一是客戶端鏈接時,會啓動messagePump負責將Channel裏面的消息取出來,並push給客戶端;二是channel建立時,會將建立的channel告知client。
messagePump獲取建立的這個channel,並從channel的memoryMsgChan接收消息,而後push給消費者。atom

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    ...

    //channel建立成功後,經過SubEventChan告知client
    subEventChan := client.SubEventChan

    for {
        if subChannel == nil || !client.IsReadyForMessages() {
            ...
        } else if flushed {
            ...
        } else {
            // 獲取channel的memoryMsgChan
            memoryMsgChan = subChannel.memoryMsgChan
            ...
        }

        select {
        ...
        case subChannel = <-subEventChan:
            p.ctx.nsqd.logf(LOG_INFO, "get subEventChan:%+v", subChannel)
            // you can't SUB anymore
            subEventChan = nil


        ...
        //從memoryMsgChan裏接收消息,並push給客戶端
        case msg := <-memoryMsgChan:
            if msg == nil{
                continue
            }

            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            //將消息放入in-flight隊列
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            p.ctx.nsqd.logf(LOG_INFO, "get memory msg:%+v", msg)
            //將消息push給消費者
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        ...
        case <-client.ExitChan:
            goto exit
        }
    }

exit:
    ...
}

in-flight數據與deferred數據處理

回一下前面講到兩點:一是queueScanLoop函數會處理in-flight數據與deferred數據;二是消息push給消費者以前會調用StartInFlightTimeout將該消息放入in-flight隊列。
queueScanLoop管理一個queueScanWorker pool(默認大小爲4),各個worker併發處理channel數據。

in-flight數據的存儲與清理

in-flight數據存儲時會記錄下該消息的到期時間,以便到期後將該消息從新push給消費者。

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    //存儲到期時間
    msg.pri = now.Add(timeout).UnixNano()
    err := c.pushInFlightMessage(msg)
    if err != nil {
        return err
    }
    c.addToInFlightPQ(msg)
    return nil
}

若是消費者成功接收,則會迴應一個"FIN",nsqd收到"FIN"則將該消息從in-flight隊列中清除。

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
    ...

    id, err := getMessageID(params[1])
    if err != nil {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error())
    }
    
    //將該消息從in-flight隊列中清除
    err = client.Channel.FinishMessage(client.ID, *id)
    if err != nil {
        return nil, protocol.NewClientErr(err, "E_FIN_FAILED",
            fmt.Sprintf("FIN %s failed %s", *id, err.Error()))
    }

    client.FinishedMessage()

    return nil, nil
}

defered數據的存儲與清理

若是消費者收到消息之後,若是一時間本身處理不過來,能夠經過"REQ"將該消息從新入隊,並能夠設定多長時間後從新消費,時間爲0的話則當即消費,不然延遲消費。
延遲消費的處理方式與in-flight數據相似,也是先寫入到一個隊列,並設定到期時間,等待從新讀取。
下面介紹這兩部分數據時如何從新消費的,主要是queueScanLoop的處理邏輯。

worker的建立與銷燬

worker的建立與銷燬是在resizePool函數。
worker的完美個數爲channel總數的四分之一,可是不能大於QueueScanWorkerPoolMax。

1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)

全部的worker都會監聽同一個workCh、closeCh,若是worker過多,則只須要向closeCh寫入一個「通知」,收到這個「通知」的worker就會被銷燬。
一次for循環只建立或銷燬一個worker,直至worker數目達到idealPoolSize。

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

channel的選擇

queueScanLoop的處理方法模仿了Redis的機率到期算法(probabilistic expiration algorithm):每過一個QueueScanInterval(默認100ms)間隔,進行一次機率選擇,從全部的channel緩存中隨機選擇QueueScanSelectionCount(默認20)個channel,若是某個被選中channel的任何一個queue有事可作,則認爲該channel爲「髒」channel。若是被選中channel中「髒」channel的比例大於QueueScanDirtyPercent(默認25%),則不投入睡眠,直接進行下一次機率選擇。
channel緩存每QueueScanRefreshInterval(默認5s)刷新一次。

queueScanLoop與worker的交互

queueScanLoop與worker之間經過workCh與responseCh來進行交互。

  • workCh:queueScanLoop隨機選擇必定數目的channel後,經過workCh告訴worker。
  • responseCh:worker處理完成後,經過responseCh反饋該channel是否爲「髒」。
func (n *NSQD) queueScanLoop() {
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    closeCh := make(chan int)

    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    //根據channel數目,建立worker
    channels := n.channels()
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:
            //更新channel緩存,並據此建立或者銷燬worker
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }

        //workTicker到期,且channels長度不爲0時,會走到這裏。
        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        //隨機選擇num個channel,並傳入workCh
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        //等待這num個channel的處理結果(是否爲「髒」channel)
        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        //若是「髒」channel達到必定比例,直接進行下次處理
        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

exit:
    n.logf(LOG_INFO, "QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

worker處理

worker從queueScanLoop接收須要處理的channel,處理該channel的in-flight數據與deferred數據。processInFlightQueue與processDeferredQueue函數都會調用c.put(msg),將數據發送到Channel的memoryMsgChan,進而從新被push到消費者。

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            //處理in-flight消息
            if c.processInFlightQueue(now) {
                dirty = true
            }
            //處理defered消息
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}
func (c *Channel) processInFlightQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.inFlightMutex.Lock()
        //獲取超時的消息
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()

        if msg == nil {
            goto exit
        }
        dirty = true
        
        //判斷該消息是否屬於這個client
        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            client.TimedOutMessage()
        }
        //將消息從新寫入channel
        c.put(msg)
    }

exit:
    return dirty
}

processDeferredQueue的處理與此相似。

相關文章
相關標籤/搜索