NSQ 源碼閱讀(三)TCP Handler

tcp handler

tcp handler 處理每個tcp connectiongit

type tcpServer struct {
    ctx *context
}

func (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())

    // The client should initialize itself by sending a 4 byte sequence indicating
    // the version of the protocol that it intends to communicate, this will allow us
    // to gracefully upgrade the protocol away from text/line oriented to whatever...
    // ztd: 客戶端每次創建鏈接後的第一條消息都會發協議版本過來,從代碼    
    //來看,目前只支持 v2
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
        return
    }
    protocolMagic := string(buf)

    p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx}
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }
    // ztd: 調用了IOLoop 函數來處理客戶端的鏈接
    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}

IOloop對於tcp connection 的處理:github

func (p *protocolV2) IOLoop(conn net.Conn) error {
    var err error
    var line []byte
    var zeroTime time.Time

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)

    // synchronize the startup of messagePump in order
    // to guarantee that it gets a chance to initialize
    // goroutine local state derived from client attributes
    // and avoid a potential race with IDENTIFY (where a client
    // could have changed or disabled said attributes)
    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan

    for {
        if client.HeartbeatInterval > 0 {
            client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
        } else {
            client.SetReadDeadline(zeroTime)
        }

        // ReadSlice does not allocate new space for the data each request
        // ie. the returned slice is only valid until the next call to it
        line, err = client.Reader.ReadSlice('\n')
        if err != nil {
            if err == io.EOF {
                err = nil
            } else {
                err = fmt.Errorf("failed to read command - %s", err)
            }
            break
        }

        // trim the '\n'
        line = line[:len(line)-1]
        // optionally trim the '\r'
        if len(line) > 0 && line[len(line)-1] == '\r' {
            line = line[:len(line)-1]
        }
        // ztd:命令以空格分隔
        params := bytes.Split(line, separatorBytes)

        if p.ctx.nsqd.getOpts().Verbose {
            p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params)
        }

        var response []byte
        // ztd: 執行命令,不一樣的命令執行不一樣的函數,後面對照一個典型的client 討論
        response, err = p.Exec(client, params)
        if err != nil {
            ctx := ""
            if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
                ctx = " - " + parentErr.Error()
            }
            p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, err, ctx)

            sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
            if sendErr != nil {
                p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
                break
            }

            // errors of type FatalClientErr should forceably close the connection
            if _, ok := err.(*protocol.FatalClientErr); ok {
                break
            }
            continue
        }
        
        if response != nil {
            err = p.Send(client, frameTypeResponse, response)
            if err != nil {
                err = fmt.Errorf("failed to send response - %s", err)
                break
            }
        }
    }
    // ztd: 收到EOF代表客戶端關閉了鏈接
    p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting ioloop", client)
    conn.Close()
    close(client.ExitChan)
    if client.Channel != nil {
        client.Channel.RemoveClient(client.ID)
    }

    return err
}

參照官網的consumer 示例寫了一個簡單的client,這個client 的功能就是訂閱一個topic 和 channel,當有producer 向這個channel 發消息時,將消息打印在屏幕上。但願經過交互的過程來進一步理解server NSQD. 以下緩存

package main

import (
    "fmt"

    nsq "github.com/nsqio/go-nsq"
)

func main() {
    config := nsq.NewConfig()

    c, err := nsq.NewConsumer("nsq", "consumer", config)
    if err != nil {
        fmt.Println("Failed to init consumer: ", err.Error())
        return
    }

    c.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error {
        fmt.Println("received message: ", string(m.Body))
        m.Finish()
        return nil
    }))

    err = c.ConnectToNSQD("127.0.0.1:4150")
    if err != nil {
        fmt.Println("Failed to connect to nsqd: ", err.Error())
        return
    }

    <-c.StopChan
}

在ConnectToNSQD 過程當中,有兩步與server 端的交互。第一步:服務器

resp, err := conn.Connect()

在Connect 部分:數據結構

conn, err := dialer.Dial("tcp", c.addr)
    if err != nil {
        return nil, err
    }
    c.conn = conn.(*net.TCPConn)
    c.r = conn
    c.w = conn

    _, err = c.Write(MagicV2)

tcp 鏈接創建後,會向server 端發送協議版本號,正如咱們在tcp handler 看到的那樣,每次鏈接創建後都會先收到一個協議版本號。
第二步交互:tcp

cmd := Subscribe(r.topic, r.channel)
    err = conn.WriteCommand(cmd)

客戶端會向服務器端發送"SUB topic channel" 這樣一條命令。
下面來看server 端是如何處理這個命令的
在方法func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {中,咱們略過各類各樣的檢查:ide

topic := p.ctx.nsqd.GetTopic(topicName)
    channel := topic.GetChannel(channelName)
    channel.AddClient(client.ID, client)

    atomic.StoreInt32(&client.State, stateSubscribed)
    client.Channel = channel
    // update message pump
    client.SubEventChan <- channel

除了給channel 添加了一個client 和給client 分配一個channel意外,還更新了message pump,好了,是時候來看看這個message pump 都作了什麼了。函數

在tcp Handler 中,有這樣的代碼oop

messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan

進入到protocal_v2 的messagePump 中:this

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    var err error
    var buf bytes.Buffer
    var memoryMsgChan chan *Message
    var backendMsgChan chan []byte
    var subChannel *Channel
    // NOTE: `flusherChan` is used to bound message latency for
    // the pathological case of a channel on a low volume topic
    // with >1 clients having >1 RDY counts
    var flusherChan <-chan time.Time
    var sampleRate int32

    subEventChan := client.SubEventChan
    identifyEventChan := client.IdentifyEventChan
    outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
    heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
    heartbeatChan := heartbeatTicker.C
    msgTimeout := client.MsgTimeout

    // v2 opportunistically buffers data to clients to reduce write system calls
    // we force flush in two cases:
    //    1. when the client is not ready to receive messages
    //    2. we're buffered and the channel has nothing left to send us
    //       (ie. we would block in this loop anyway)
    //
    flushed := true

    // signal to the goroutine that started the messagePump
    // that we've started up
    close(startedChan)

    for {
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            memoryMsgChan = nil
            backendMsgChan = nil
            flusherChan = nil
            // force flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        // ztd: 一旦subChannel 不是nil,就將memoryMsgChan 和            
        //backendMsgChan 賦值
        } else if flushed {
            // last iteration we flushed...
            // do not select on the flusher ticker channel
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        } else {
            // we're buffered (if there isn't any more data we should flush)...
            // select on the flusher ticker channel, too
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        select {
        case <-flusherChan:
            // if this case wins, we're either starved
            // or we won the race between other channels...
            // in either case, force flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        case <-client.ReadyStateChan:
        // ztd: 在SUB 函數裏client.SubEventChan <- channel 就是
        // 給這個subChannel 賦了值
        case subChannel = <-subEventChan:
            // you can't SUB anymore
            subEventChan = nil
        case identifyData := <-identifyEventChan:
            // you can't IDENTIFY anymore
            identifyEventChan = nil

            outputBufferTicker.Stop()
            if identifyData.OutputBufferTimeout > 0 {
                outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
            }

            heartbeatTicker.Stop()
            heartbeatChan = nil
            if identifyData.HeartbeatInterval > 0 {
                heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
                heartbeatChan = heartbeatTicker.C
            }

            if identifyData.SampleRate > 0 {
                sampleRate = identifyData.SampleRate
            }

            msgTimeout = identifyData.MsgTimeout
        case <-heartbeatChan:
            err = p.Send(client, frameTypeResponse, heartbeatBytes)
            if err != nil {
                goto exit
            }
        case b := <-backendMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }

            msg, err := decodeMessage(b)
            if err != nil {
                p.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case msg := <-memoryMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }

exit:
    p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting messagePump", client)
    heartbeatTicker.Stop()
    outputBufferTicker.Stop()
    if err != nil {
        p.ctx.nsqd.logf("PROTOCOL(V2): [%s] messagePump error - %s", client, err)
    }
}```
在這段代碼裏,一旦有client 訂閱一個channel,就開始監聽memoryMsgChan, 等待producer 發送信息過來。下面看一下pub 的過程 :
topic := p.ctx.nsqd.GetTopic(topicName)
msg := NewMessage(topic.GenerateID(), messageBody)
err = topic.PutMessage(msg)
if err != nil {
    return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}
在作了一系列檢查以後,向topic put 了一條message。 每次new 一個topic 的時候,會啓動一個topic的 messagePump:
// Topic constructor
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
    name:              topicName,
    channelMap:        make(map[string]*Channel),
    memoryMsgChan:     make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
    exitChan:          make(chan int),
    channelUpdateChan: make(chan int),
    ctx:               ctx,
    pauseChan:         make(chan bool),
    deleteCallback:    deleteCallback,
    idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),
}

if strings.HasSuffix(topicName, "#ephemeral") {
    t.ephemeral = true
    t.backend = newDummyBackendQueue()
} else {
    t.backend = diskqueue.New(topicName,
        ctx.nsqd.getOpts().DataPath,
        ctx.nsqd.getOpts().MaxBytesPerFile,
        int32(minValidMsgLength),
        int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
        ctx.nsqd.getOpts().SyncEvery,
        ctx.nsqd.getOpts().SyncTimeout,
        ctx.nsqd.getOpts().Logger)
}

t.waitGroup.Wrap(func() { t.messagePump() })

t.ctx.nsqd.Notify(t)

return t
}
在messagePump 中,會監聽topic 的memoryMsgChan:
for {
    select {
    case msg = <-memoryMsgChan:
而每次收到一個消息,會向topic 下面全部的channel 進行廣播:
for i, channel := range chans {
        chanMsg := msg
        // copy the message because each channel
        // needs a unique instance but...
        // fastpath to avoid copy if its the first channel
        // (the topic already created the first copy)
        if i > 0 {
            chanMsg = NewMessage(msg.ID, msg.Body)
            chanMsg.Timestamp = msg.Timestamp
            chanMsg.deferred = msg.deferred
        }
        if chanMsg.deferred != 0 {
            channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
            continue
        }
        err := channel.PutMessage(chanMsg)
        if err != nil {
            t.ctx.nsqd.logf(
                "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                t.name, msg.ID, channel.name, err)
        }
    }
若是對nsq 有了解的話,會知道每個topic 會將一個msg 廣播給全部的channel,這個邏輯的實現就在這塊。
channel 的PutMessage:
func (c *Channel) put(m *Message) error {
    select {
    case c.memoryMsgChan <- m:
將消息塞入了channel 的memoryMsgChan中。這時,代碼又回到了protocal_v2 的 messagePump中:
case msg := <-memoryMsgChan:
        if sampleRate > 0 && rand.Int31n(100) > sampleRate {
            continue
        }
        msg.Attempts++
        // ztd: 出於可靠性的考慮,將消息發送到subscriber 並不真    
        // 正將消息刪除,而是設置過時時間後,將消息緩存起來
        subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
        
        client.SendingMessage()
        // ztd: 向客戶端發送消息
        err = p.SendMessage(client, msg, &buf)
        if err != nil {
            goto exit
        }
        flushed = false
在客戶端的代碼中,咱們注意有一行: `m.Finish()`,這行代碼是告訴server 端我已經消費完這條信息了,能夠丟棄了。這行代碼向server 端發送一條`FIN` 命令。在server 端:
// FinishMessage successfully discards an in-flight message
 func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
    msg, err := c.popInFlightMessage(clientID, id)
    if err != nil {
        return err
    }
    c.removeFromInFlightPQ(msg)
    if c.e2eProcessingLatencyStream != nil {
        c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
    }
    return nil
}
做者將這個msg 加到了兩個queue,一個是message queue(數據結構實際上是個map),另一個是infligtPG,後面會講到inflightPQ 的做用。
若是沒有及時Finish 消息,怎麼處理timeout 的消息呢?在NSQD的 Main 函數中,啓動了一個queueScanLoop:
n.waitGroup.Wrap(func() { n.queueScanLoop() })
在這個loop 中,設置了一個ticker,每過一段時間,就會執行resizePool:
func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)

workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)

for {
    select {
    case <-workTicker.C:
        if len(channels) == 0 {
            continue
        }
    case <-refreshTicker.C:
        channels = n.channels()
        n.resizePool(len(channels), workCh, responseCh, closeCh)
        continue
    case <-n.exitChan:
        goto exit
    }
resizePool 中,執行了這麼個函數:
func (c *Channel) processInFlightQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
    return false
}

dirty := false
for {
    c.inFlightMutex.Lock()
    // ztd: 取出一個過時的
    msg, _ := c.inFlightPQ.PeekAndShift(t)
    c.inFlightMutex.Unlock()

    if msg == nil {
        goto exit
    }
    dirty = true

    _, err := c.popInFlightMessage(msg.clientID, msg.ID)
    if err != nil {
        goto exit
    }
    atomic.AddUint64(&c.timeoutCount, 1)
    c.RLock()
    client, ok := c.clients[msg.clientID]
    c.RUnlock()
    if ok {
        client.TimedOutMessage()
    }
    c.put(msg)
}

exit:
return dirty

}

這個函數不斷從inflightPqueue 中取出一個過時的,從inflightMsgQueue 中刪除。PeekAndShift:
func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
if len(*pq) == 0 {
    return nil, 0
}

x := (*pq)[0]
if x.pri > max {
    return nil, x.pri - max
}
pq.Pop()

return x, 0

}

inflightPqueue 的數據結構是一個最小堆,每次push 一條新的消息:
func (pq *inFlightPqueue) up(j int) {
for {
    i := (j - 1) / 2 // parent
    if i == j || (*pq)[j].pri >= (*pq)[i].pri {
        break
    }
    pq.Swap(i, j)
    j = i
}
}
因此,堆頂的消息是最近一個過時的消息,若是最近一條過時的消息都尚未過時,那就沒有過時的消息。若是有過時的,就pop 出來。這樣在for 循環中不斷把過時消息pop 出來,直到沒有過時的消息。
相關文章
相關標籤/搜索