tcp handler 處理每個tcp connectiongit
type tcpServer struct { ctx *context } func (p *tcpServer) Handle(clientConn net.Conn) { p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us // to gracefully upgrade the protocol away from text/line oriented to whatever... // ztd: 客戶端每次創建鏈接後的第一條消息都會發協議版本過來,從代碼 //來看,目前只支持 v2 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err) return } protocolMagic := string(buf) p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } // ztd: 調用了IOLoop 函數來處理客戶端的鏈接 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err) return } }
IOloop對於tcp connection 的處理:github
func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize // goroutine local state derived from client attributes // and avoid a potential race with IDENTIFY (where a client // could have changed or disabled said attributes) messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan for { if client.HeartbeatInterval > 0 { client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2)) } else { client.SetReadDeadline(zeroTime) } // ReadSlice does not allocate new space for the data each request // ie. the returned slice is only valid until the next call to it line, err = client.Reader.ReadSlice('\n') if err != nil { if err == io.EOF { err = nil } else { err = fmt.Errorf("failed to read command - %s", err) } break } // trim the '\n' line = line[:len(line)-1] // optionally trim the '\r' if len(line) > 0 && line[len(line)-1] == '\r' { line = line[:len(line)-1] } // ztd:命令以空格分隔 params := bytes.Split(line, separatorBytes) if p.ctx.nsqd.getOpts().Verbose { p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params) } var response []byte // ztd: 執行命令,不一樣的命令執行不一樣的函數,後面對照一個典型的client 討論 response, err = p.Exec(client, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr != nil { p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { err = p.Send(client, frameTypeResponse, response) if err != nil { err = fmt.Errorf("failed to send response - %s", err) break } } } // ztd: 收到EOF代表客戶端關閉了鏈接 p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting ioloop", client) conn.Close() close(client.ExitChan) if client.Channel != nil { client.Channel.RemoveClient(client.ID) } return err }
參照官網的consumer 示例寫了一個簡單的client,這個client 的功能就是訂閱一個topic 和 channel,當有producer 向這個channel 發消息時,將消息打印在屏幕上。但願經過交互的過程來進一步理解server NSQD. 以下緩存
package main import ( "fmt" nsq "github.com/nsqio/go-nsq" ) func main() { config := nsq.NewConfig() c, err := nsq.NewConsumer("nsq", "consumer", config) if err != nil { fmt.Println("Failed to init consumer: ", err.Error()) return } c.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error { fmt.Println("received message: ", string(m.Body)) m.Finish() return nil })) err = c.ConnectToNSQD("127.0.0.1:4150") if err != nil { fmt.Println("Failed to connect to nsqd: ", err.Error()) return } <-c.StopChan }
在ConnectToNSQD 過程當中,有兩步與server 端的交互。第一步:服務器
resp, err := conn.Connect()
在Connect 部分:數據結構
conn, err := dialer.Dial("tcp", c.addr) if err != nil { return nil, err } c.conn = conn.(*net.TCPConn) c.r = conn c.w = conn _, err = c.Write(MagicV2)
tcp 鏈接創建後,會向server 端發送協議版本號,正如咱們在tcp handler 看到的那樣,每次鏈接創建後都會先收到一個協議版本號。
第二步交互:tcp
cmd := Subscribe(r.topic, r.channel) err = conn.WriteCommand(cmd)
客戶端會向服務器端發送"SUB topic channel" 這樣一條命令。
下面來看server 端是如何處理這個命令的
在方法func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
中,咱們略過各類各樣的檢查:ide
topic := p.ctx.nsqd.GetTopic(topicName) channel := topic.GetChannel(channelName) channel.AddClient(client.ID, client) atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // update message pump client.SubEventChan <- channel
除了給channel 添加了一個client 和給client 分配一個channel意外,還更新了message pump,好了,是時候來看看這個message pump 都作了什麼了。函數
在tcp Handler 中,有這樣的代碼oop
messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan
進入到protocal_v2 的messagePump 中:this
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var buf bytes.Buffer var memoryMsgChan chan *Message var backendMsgChan chan []byte var subChannel *Channel // NOTE: `flusherChan` is used to bound message latency for // the pathological case of a channel on a low volume topic // with >1 clients having >1 RDY counts var flusherChan <-chan time.Time var sampleRate int32 subEventChan := client.SubEventChan identifyEventChan := client.IdentifyEventChan outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout // v2 opportunistically buffers data to clients to reduce write system calls // we force flush in two cases: // 1. when the client is not ready to receive messages // 2. we're buffered and the channel has nothing left to send us // (ie. we would block in this loop anyway) // flushed := true // signal to the goroutine that started the messagePump // that we've started up close(startedChan) for { if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true // ztd: 一旦subChannel 不是nil,就將memoryMsgChan 和 //backendMsgChan 賦值 } else if flushed { // last iteration we flushed... // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-flusherChan: // if this case wins, we're either starved // or we won the race between other channels... // in either case, force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true case <-client.ReadyStateChan: // ztd: 在SUB 函數裏client.SubEventChan <- channel 就是 // 給這個subChannel 賦了值 case subChannel = <-subEventChan: // you can't SUB anymore subEventChan = nil case identifyData := <-identifyEventChan: // you can't IDENTIFY anymore identifyEventChan = nil outputBufferTicker.Stop() if identifyData.OutputBufferTimeout > 0 { outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout) } heartbeatTicker.Stop() heartbeatChan = nil if identifyData.HeartbeatInterval > 0 { heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval) heartbeatChan = heartbeatTicker.C } if identifyData.SampleRate > 0 { sampleRate = identifyData.SampleRate } msgTimeout = identifyData.MsgTimeout case <-heartbeatChan: err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case b := <-backendMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg, err := decodeMessage(b) if err != nil { p.ctx.nsqd.logf("ERROR: failed to decode message - %s", err) continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg, &buf) if err != nil { goto exit } flushed = false case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg, &buf) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } exit: p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting messagePump", client) heartbeatTicker.Stop() outputBufferTicker.Stop() if err != nil { p.ctx.nsqd.logf("PROTOCOL(V2): [%s] messagePump error - %s", client, err) } }``` 在這段代碼裏,一旦有client 訂閱一個channel,就開始監聽memoryMsgChan, 等待producer 發送信息過來。下面看一下pub 的過程 :
topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) }
在作了一系列檢查以後,向topic put 了一條message。 每次new 一個topic 的時候,會啓動一個topic的 messagePump:
// Topic constructor func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), exitChan: make(chan int), channelUpdateChan: make(chan int), ctx: ctx, pauseChan: make(chan bool), deleteCallback: deleteCallback, idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue() } else { t.backend = diskqueue.New(topicName, ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, ctx.nsqd.getOpts().SyncEvery, ctx.nsqd.getOpts().SyncTimeout, ctx.nsqd.getOpts().Logger) } t.waitGroup.Wrap(func() { t.messagePump() }) t.ctx.nsqd.Notify(t) return t }
在messagePump 中,會監聽topic 的memoryMsgChan:
for { select { case msg = <-memoryMsgChan:
而每次收到一個消息,會向topic 下面全部的channel 進行廣播:
for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf( "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } }
若是對nsq 有了解的話,會知道每個topic 會將一個msg 廣播給全部的channel,這個邏輯的實現就在這塊。 channel 的PutMessage:
func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m:
將消息塞入了channel 的memoryMsgChan中。這時,代碼又回到了protocal_v2 的 messagePump中:
case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ // ztd: 出於可靠性的考慮,將消息發送到subscriber 並不真 // 正將消息刪除,而是設置過時時間後,將消息緩存起來 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() // ztd: 向客戶端發送消息 err = p.SendMessage(client, msg, &buf) if err != nil { goto exit } flushed = false
在客戶端的代碼中,咱們注意有一行: `m.Finish()`,這行代碼是告訴server 端我已經消費完這條信息了,能夠丟棄了。這行代碼向server 端發送一條`FIN` 命令。在server 端:
// FinishMessage successfully discards an in-flight message func (c *Channel) FinishMessage(clientID int64, id MessageID) error { msg, err := c.popInFlightMessage(clientID, id) if err != nil { return err } c.removeFromInFlightPQ(msg) if c.e2eProcessingLatencyStream != nil { c.e2eProcessingLatencyStream.Insert(msg.Timestamp) } return nil }
做者將這個msg 加到了兩個queue,一個是message queue(數據結構實際上是個map),另一個是infligtPG,後面會講到inflightPQ 的做用。 若是沒有及時Finish 消息,怎麼處理timeout 的消息呢?在NSQD的 Main 函數中,啓動了一個queueScanLoop:
n.waitGroup.Wrap(func() { n.queueScanLoop() })
在這個loop 中,設置了一個ticker,每過一段時間,就會執行resizePool:
func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.getOpts().QueueScanInterval) refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit }
resizePool 中,執行了這麼個函數:
func (c *Channel) processInFlightQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() if c.Exiting() { return false } dirty := false for { c.inFlightMutex.Lock() // ztd: 取出一個過時的 msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true _, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } c.put(msg) } exit: return dirty
}
這個函數不斷從inflightPqueue 中取出一個過時的,從inflightMsgQueue 中刪除。PeekAndShift:
func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) { if len(*pq) == 0 { return nil, 0 } x := (*pq)[0] if x.pri > max { return nil, x.pri - max } pq.Pop() return x, 0
}
inflightPqueue 的數據結構是一個最小堆,每次push 一條新的消息:
func (pq *inFlightPqueue) up(j int) { for { i := (j - 1) / 2 // parent if i == j || (*pq)[j].pri >= (*pq)[i].pri { break } pq.Swap(i, j) j = i } }
因此,堆頂的消息是最近一個過時的消息,若是最近一條過時的消息都尚未過時,那就沒有過時的消息。若是有過時的,就pop 出來。這樣在for 循環中不斷把過時消息pop 出來,直到沒有過時的消息。