看完了nsqlookupd咱們繼續往下看, nsqd纔是他的核心. 裏面大量的使用到了go channel, 相信看完以後對你學習go有很大的幫助.相較於lookupd部分不管在代碼邏輯和實現上都要複雜不少.
不過基本的代碼結構基本上都是同樣的, 進程使用go-srv來管理, Main裏啓動一個http sever和一個tcp server, 這裏能夠參考下以前文章的進程模型小節, 不過在nsqd中會啓動另外的兩個goroutine queueScanLoop和lookupLoop。下面是一個
具體的進程模型。
後面的分析都是基於這個進程模型。golang
啓動時序這塊兒大致上和lookupd中的一致, 咱們下面來看看lookupLoop和queueScanLoop.
lookupLoop代碼見nsqd/lookup.go中 主要作如下幾件事情:redis
因爲設計到了nsq裏的in-flight/deferred message, 咱們把queueScanLoop放到最後來看.算法
下面咱們就經過一條message的生命週期來看下nsqd的工做原理. 根據官方的QuickStart, 咱們能夠經過curl來pub一條消息.sql
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
咱們就跟着代碼看一下, 首先是http對此的處理:api
// nsq/nsqd/http.go func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { ... reqParams, topic, err := s.getTopicFromQuery(req) // 從http query中拿到topic信息 ... }
// nsq/nsqd/http.go func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) { reqParams, err := url.ParseQuery(req.URL.RawQuery) topicNames, ok := reqParams["topic"] return reqParams, s.ctx.nsqd.GetTopic(topicName), nil }
// nsq/nsqd/nsqd.go // GetTopic performs a thread safe operation // to return a pointer to a Topic object (potentially new) func (n *NSQD) GetTopic(topicName string) *Topic { // 1. 首先查看n.topicMap,確認該topic是否已經存在(存在直接返回) t, ok := n.topicMap[topicName] // 2. 不然將新建一個topic t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t // 3. 查看該nsqd是否設置了lookupd, 從lookupd獲取該tpoic的channel信息 // 這個topic/channel已經經過nsqlookupd的api添加上去的, 可是nsqd的本地 // 尚未, 針對這種狀況咱們須要建立該channel對應的deffer queue和inFlight // queue. lookupdHTTPAddrs := n.lookupdHTTPAddrs() if len(lookupdHTTPAddrs) > 0 { channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) } // now that all channels are added, start topic messagePump // 對該topic的初始化已經完成下面就是message t.Start() return t }
在上面消息初始化完成以後就啓動了tpoic對應的messagePumpbash
// nsq/nsqd/topic.go // messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic func (t *Topic) messagePump() { // 1. do not pass messages before Start(), but avoid blocking Pause() // or GetChannel() // 等待channel相關的初始化完成,GetTopic中最後的t.Start()才正式啓動該Pump // 2. main message loop // 開始從Memory chan或者disk讀取消息 // 若是topic對應的channel發生了變化,則更新channel信息 // 3. 往該tpoic對應的每一個channel寫入message(若是是deffermessage // 的話放到對應的deffer queue中 // 不然放到該channel對應的memoryMsgChan中)。 }
至此也就完成了從tpoic memoryMsgChan收到消息投遞到channel memoryMsgChan的投遞, 咱們先看下http
收到消息到通知pump處理的過程。dom
// nsq/nsqd/http.go func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { ... msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred err = topic.PutMessage(msg) if err != nil { return nil, http_api.Err{503, "EXITING"} } return "OK", nil }
// nsq/nsqd/topic.go // PutMessage writes a Message to the queue func (t *Topic) PutMessage(m *Message) error { t.RLock() defer t.RUnlock() if atomic.LoadInt32(&t.exitFlag) == 1 { return errors.New("exiting") } err := t.put(m) if err != nil { return err } atomic.AddUint64(&t.messageCount, 1) return nil }
func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } } return nil }
這裏memoryMsgChan的大小咱們能夠經過--mem-queue-size參數來設置,上面這段代碼的流程是若是memoryMsgChan尚未滿的話
就把消息放到memoryMsgChan中,不然就放到backend(disk)中。topic的mesasgePump檢測到有新的消息寫入的時候就開始工做了,
從memoryMsgChan/backend(disk)讀取消息投遞到channel對應的chan中。 還有一點請注意就是messagePump中curl
if len(chans) > 0 && !t.IsPaused() { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() }
這段代碼只有channel(此channel非golang裏的channel而是nsq的channel相似nsq_to_file)存在的時候纔會去投遞。上面部分就是
msg從producer生產消息到吧消息寫到memoryChan/Disk的過程,下面咱們來看下consumer消費消息的過程。tcp
首先是consumer從nsqlookupd查詢到本身所感興趣的topic/channel的nsqd信息, 而後就是來鏈接了。ide
對新的client的處理
//nsq/internal/protocol/tcp_server.go func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { go handler.Handle(clientConn) }
//nsq/nsqd/tcp.go func (p *tcpServer) Handle(clientConn net.Conn) { prot.IOLoop(clientConn) }
針對每一個client起一個messagePump吧msg從上面channel對應的chan 寫入到consumer側
//nsq/nsqd/protocol_v2.go func (p *protocolV2) IOLoop(conn net.Conn) error { client := newClientV2(clientID, conn, p.ctx) p.ctx.nsqd.AddClient(client.ID, client) messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) // read the request line, err = client.Reader.ReadSlice('\n') response, err = p.Exec(client, params) p.Send(client, frameTypeResponse, response) }
//nsq/nsqd/protocol_v2.go func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) case bytes.Equal(params[0], []byte("RDY")): return p.RDY(client, params) case bytes.Equal(params[0], []byte("REQ")): return p.REQ(client, params) case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("MPUB")): return p.MPUB(client, params) case bytes.Equal(params[0], []byte("DPUB")): return p.DPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("TOUCH")): return p.TOUCH(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) case bytes.Equal(params[0], []byte("CLS")): return p.CLS(client, params) case bytes.Equal(params[0], []byte("AUTH")): return p.AUTH(client, params) } }
//nsq/nsqd/protocol_v2.go func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { var channel *Channel topic := p.ctx.nsqd.GetTopic(topicName) channel = topic.GetChannel(channelName) channel.AddClient(client.ID, client) // 通知messagePump開始工做 client.SubEventChan <- channel
通知topic的messagePump開始工做
func (t *Topic) GetChannel(channelName string) *Channel { t.Lock() channel, isNew := t.getOrCreateChannel(channelName) t.Unlock() if isNew { // update messagePump state select { case t.channelUpdateChan <- 1: case <-t.exitChan: } } return channel }
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { for { if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... // 等待client ready,而且channel的初始化完成 flushed = true } else if flushed { // last iteration we flushed... // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-flusherChan: // if this case wins, we're either starved // or we won the race between other channels... // in either case, force flush case <-client.ReadyStateChan: case subChannel = <-subEventChan: // you can't SUB anymore // channel初始化完成,pump開始工做 subEventChan = nil case identifyData := <-identifyEventChan: // you can't IDENTIFY anymore case <-heartbeatChan: // heartbeat的處理 case b := <-backendMsgChan: // 1. decode msg // 2. 把msg push到Flight Queue裏 // 3. send msg to client case msg := <-memoryMsgChan: // 1. 把msg push到Flight Queue裏 // 2. send msg to client case <-client.ExitChan: // exit the routine } }
至此咱們看的代碼就是一條消息從pub到nsqd中到被消費者處理的過程。不過得注意一點,咱們在上面的代碼分析中,建立
topic/channel的部分放到了message Pub的鏈上, 若是是沒有lookupd的模式的話這部分是在client SUB鏈上的。
在NSQ內部經過
type NSQD struct { topicMap map[string]*Topic } 和 type Topic struct { channelMap map[string]*Channel }
來維護一個內部的topic/channel狀態,而後在提供了以下的接口來管理topic和channel
/topic/create - create a new topic /topic/delete - delete a topic /topic/empty - empty a topic /topic/pause - pause message flow for a topic /topic/unpause - unpause message flow for a topic /channel/create - create a new channel /channel/delete - delete a channel /channel/empty - empty a channel /channel/pause - pause message flow for a channel /channel/unpause - unpause message flow for a channel
create topic/channel的話咱們在以前的代碼看過了,這裏能夠重點看下topic/channel delete的時候怎樣保證數據優雅的刪除的,以及
messagePump的退出機制。
// queueScanLoop runs in a single goroutine to process in-flight and deferred // priority queues. It manages a pool of queueScanWorker (configurable max of // QueueScanWorkerPoolMax (default: 4)) that process channels concurrently. // // It copies Redis's probabilistic expiration algorithm: it wakes up every // QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount // (default: 20) channels from a locally cached list (refreshed every // QueueScanRefreshInterval (default: 5s)). // // If either of the queues had work to do the channel is considered "dirty". // // If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty, // the loop continues without sleep.
這裏的註釋已經說的很明白了,queueScanLoop就是經過動態的調整queueScanWorker的數目來處理
in-flight和deffered queue的。在具體的算法上的話參考了redis的隨機過時算法。
閱讀源碼就是走走停停的過程,從一開始的無從下手到後面的一點點的把它啃透。一開始都以爲很困難,無從下手。之前也是嘗試着去看一些
經典的開源代碼,但都沒能堅持下來,有時候人大概是會高估本身的能力的,好多東西自覺得看個一兩遍就能看懂,其實否則,
好多知識只有不斷的去研究你才能參透其中的原理。
必定要持續的讀,否則過幾天以後就忘了前面讀的內容 必定要多總結, 總結就是在不斷的讀的過程,從第一遍讀通到你把它表述出來至少須要再讀5-10次 多思考,這段時間在地鐵上/跑步的時候我會迴向一下其中的流程 分享(讀懂是一個層面,寫出來是一個層面,講給別人聽是另一個層面)
後面我會先看下go-nsqd部分的代碼,以後會研究下gnatsd, 兩個都是cloud native的消息系統,看下有啥區別。