nsq 是用go語言實現的分佈式隊列。閱讀源碼對go語言的chanel,分佈式有着更好的理解html
核心代碼分位3部分:node
官方的介紹爲python
nsqd is the daemon that receives, queues, and delivers messages to clients.git
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).github
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.golang
大意爲:nsqd是接收,分發隊列信息的守護進程。通常集羣化運行,也能夠獨自部署。sql
下面對nsqd的2個邏輯作一次學習json
在Makefile中,寫到api
$(BLDDIR)/nsqd: $(wildcard apps/nsqd/*.go nsqd/*.go nsq/*.go internal/*/*.go)
能夠找到nsqd的代碼入口在apps/nsqd/nsqd.go
緩存
apps/nsqd/nsqd.go
這個文件做爲程序入口,主要作了幾件事情:
首先做者使用svc包來控制程序的啓動:
type program struct { nsqd *nsqd.NSQD } func main() { prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { log.Fatal(err) } } func (p *program) Init(env svc.Environment) error {...} func (p *program) Start() error {...} func (p *program) Stop() error {...}
使用svc 能更簡潔的保證程序乾淨的退出。在nsqd中,退出信號有兩個:SIGINT(輸入任意健) 和 SIGTERM(kill)。
Start()函數是主要邏輯的入口,在函數中引用了NewOptions(),它會建立一個默認的Options 結構。Options 後續會做爲nsqd啓動的參數來源
opts := nsqd.NewOptions()
做者經過flag包實現了命令行參數接收,若是命令行中執行配置文件,會同時讀取配置文件。根據配置文件,命令行參數,來建立一個nsqd結構
options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts)
接下來會加載數據
err := nsqd.LoadMetadata() err = nsqd.PersistMetadata()
LoadMetadata()過程爲:
PersistMetadata()過程爲:
接下來調用啓動nsqd的主邏輯nsqd.Main(),主要完成如下過程
n.waitGroup.Wrap(func() { http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf) }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) }
這裏使用到了waitGroup,它是一個groutines 的控制包,能上線相似python 的join()功能。能夠實現全部groutines都執行完再退出。
做者封裝了waitGroup庫
type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() }
Add() 會計數器加1,Done()使得計數器減一。此外WaitGroup提供Wait()函數:當計數器歸0時,繼續執行,不然阻塞。等待線程執行完再退出的做用。
此外,將函數做爲參數,再在內部groutines執行,和python的裝飾器的用法相似。
回到Main()函數中,啓動http_api利用到了github.com/nsqio/nsq/internal/http_api
包, 設置router等參數後,啓動。
queueScanLoop() 是管道掃進程,他的邏輯是將tpic,channel中的數據讀入到worker channel, 並每隔必定的時間更新worker數量,掃描chanel中的數據。
select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit }
這裏使用select來監聽io操做,每隔掃描間隔時,判斷channel中的是否存在數據須要處理,若是沒有,則略過本次掃描。
每隔刷新間隔判斷worker數量是否發生變化。
loop: numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop }
這裏還有dirty比率的概念,channel中有數據就認爲是dirty,當該比率超過配置中的值時,則繼續處理調用worker來處理,而不是等待固定間隔才進行掃描。
啓動lookupLoop()和statsdLoop();這兩個函數的做用初步看和nsqdlookup通訊用,細節還未了解。
上面闡述了nsqd的啓動邏輯。nsqd使用http api和用戶交互
在api文檔中,看到pub接口用來發布信息:
使用示例curl -d "<message>" http://127.0.0.1:4151/pub?topic=name
在nsqd/http.go中,定義了路由規則
func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer { ... s := &httpServer{ ctx: ctx, tlsEnabled: tlsEnabled, tlsRequired: tlsRequired, router: router, } router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) ... }
在doPUB()函數中,能夠看到數據存儲時,最終調用了opic.PutMessage(msg)
err = topic.PutMessage(msg)
func (t *Topic) PutMessage(m *Message) error { t.RLock() defer t.RUnlock() if atomic.LoadInt32(&t.exitFlag) == 1 { return errors.New("exiting") } err := t.put(m) if err != nil { return err } atomic.AddUint64(&t.messageCount, 1) return nil }
PutMessage的邏輯是作併發控制(加鎖)後,調Topic.put(*Message) 來寫入信息。
這裏有兩個鎖控制機制:
RLoclk
go語言中,sync包有兩種鎖,分別是互斥鎖sync.Mutex和讀寫鎖sync.RWMutex。
type Mutex func (m *Mutex) Lock() func (m *Mutex) Unlock() type RWMutex func (rw *RWMutex) Lock() func (rw *RWMutex) RLock() func (rw *RWMutex) RLocker() Locker func (rw *RWMutex) RUnlock() func (rw *RWMutex) Unlock()
互斥鎖傾向於在全局使用,一旦加鎖,就必須解鎖以後才能訪問。不二次加鎖、二次解鎖都會報錯。
讀寫鎖用在讀遠多於寫的場景。
Lock()表示寫加鎖,加寫鎖前,若是已經存在寫鎖,或者其餘讀鎖,會阻塞住,直到鎖可用。已阻塞的 Lock 調用會從得到的鎖中排除新的讀取器,即寫鎖權限高於讀鎖,有寫鎖時優先進行寫鎖定。
RLock()表示讀加鎖,當有寫鎖時,沒法加載讀鎖,當只有讀鎖或者沒有鎖時,能夠加載讀鎖,讀鎖能夠加載多個,因此適用於"讀多寫少"的場景。
關於讀寫鎖的具體例子請參考golang中sync.RWMutex和sync.Mutex區別
atomic
atomic是sync包中的另外一種鎖機制,在實現上,它比互斥鎖層級更低:互斥鎖調用的是golang的api,而atomic是在內核層面實現。所以它比互斥鎖效率更高,可是使用上也存在必定的限制。若是使用存儲相關接口,存入的是nil,或者類型不對,會報錯。
此外,在一些文章中,以及stack overflow中都提到儘可能少用atomic,具體緣由還不知道。
atomic有幾種常見的函數:
具體查看atomic介紹
在上面的PutMessage邏輯中,增長topic讀鎖和topic中的部分值的原子操做鎖後,調用了put()函數來實現寫入。
func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } } return nil }
put函數的操做是,將Message寫入channel,若是該topic的memoryMsgChan長度滿了,則經過default邏輯,寫入buffer中.
buffer的實現是利用了sync.Pool包,至關因而一塊緩存,在gc前釋放,存儲的長度受限於內存大小。
這裏有兩個問題:
通過查找,發現處理上述兩個channel的函數是messagePump,而messagePump在建立一個新Topic時會被後臺調用:
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { ... t.waitGroup.Wrap(func() {t.messagePump()}) ... }
func (t *Topic) messagePump() { ... if len(chans) > 0 { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } select { case msg = <-memoryMsgChan: case buf = <-backendChan: msg, err = decodeMessage(buf) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } ... } ... for i, channel := range chans { chanMsg := msg if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } ... err := channel.PutMessage(chanMsg) ... } ... }
上述調用了channel的PutMessage()完成了message寫入channel的memoryMsgChan中,寫入邏輯和寫入topic邏輯相似。到這裏完成了數據的寫入流程分析。
官方的介紹以下
nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.
大意爲:nsqlookup是管理nsqd集羣拓補信息的守護進程。nsqlookup用於
下面梳理一下nsqllookup的兩個邏輯:
根據查詢數據的過程進行梳理,nsq提供了幾個封裝好的查詢接口,若是nsq_tail、nsq_to_file 等。此處從nsq_til 舉例查看。
nsq_tail中主要邏輯以下:
consumers := []*nsq.Consumer{} for i := 0; i < len(topics); i += 1 { fmt.Printf("Adding consumer for topic: %s\n", topics[i]) consumer, err := nsq.NewConsumer(topics[i], *channel, cfg) if err != nil { log.Fatal(err) } consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages}) err = consumer.ConnectToNSQDs(nsqdTCPAddrs) if err != nil { log.Fatal(err) } err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs) if err != nil { log.Fatal(err) } consumers = append(consumers, consumer) }
nsq_tail的邏輯是針對每一個topic,分別初始化一個消費者consumer, 此處consumer實現的庫是go-nsq。
並實現一個nsq_tail邏輯的handler,初始化在consumer中。
以後從nsqd和nsqdlookup中獲取數據,並調用handler處理。
在go-nsq/consumer.go中,ConnectToNSQLookupd()會調用queryLookupd()和lookupdLoop(),而lookupdLoop()又會按期調用queryLookupd()。代碼以下:
func (r *Consumer) ConnectToNSQLookupd(addr string) error { ... if numLookupd == 1 { r.queryLookupd() r.wg.Add(1) go r.lookupdLoop() } ... } func (r *Consumer) lookupdLoop() { ... for { select { case <-ticker.C: r.queryLookupd() case <-r.lookupdRecheckChan: r.queryLookupd() case <-r.exitChan: goto exit } } ... } // make an HTTP req to one of the configured nsqlookupd instances to discover // which nsqd's provide the topic we are consuming. // // initiate a connection to any new producers that are identified. func (r *Consumer) queryLookupd() { ... var nsqdAddrs []string for _, producer := range data.Producers { broadcastAddress := producer.BroadcastAddress port := producer.TCPPort joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port)) nsqdAddrs = append(nsqdAddrs, joined) } // apply filter if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok { nsqdAddrs = discoveryFilter.Filter(nsqdAddrs) } for _, addr := range nsqdAddrs { err = r.ConnectToNSQD(addr) if err != nil && err != ErrAlreadyConnected { r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err) continue } } }
在queryLookupd()中,獲取到生產者信息後,調用ConnectToNSQD()鏈接每一個nsqd server。用ConnectToNSQD()實現了讀取message。
ConnectYpNSQD()調用了connection結果的函數readLoop()。
func (c *Conn) readLoop() { for { ... frameType, data, err := ReadUnpackedResponse(c) ... switch frameType { case FrameTypeResponse: c.delegate.OnResponse(c, data) case FrameTypeMessage: msg, err := DecodeMessage(data) if err != nil { c.log(LogLevelError, "IO error - %s", err) c.delegate.OnIOError(c, err) goto exit } msg.Delegate = delegate msg.NSQDAddress = c.String() atomic.AddInt64(&c.rdyCount, -1) atomic.AddInt64(&c.messagesInFlight, 1) atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano()) c.delegate.OnMessage(c, msg) ... } }
在c.delegate.OnMessage(c, msg)中,會將message寫入Consumer.incomingMessages。完成數據讀取。