nsq源碼學習

nsq源碼學習

簡介

nsq 是用go語言實現的分佈式隊列。閱讀源碼對go語言的chanel,分佈式有着更好的理解html

代碼結構

核心代碼分位3部分:node

  • nsqd:隊列數據存儲
  • nsqlookup:管理nsqd節點,服務發現
  • nsqadmin:nsq的可視化

nsqd

官方的介紹爲python

nsqd is the daemon that receives, queues, and delivers messages to clients.git

It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).github

It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.golang

大意爲:nsqd是接收,分發隊列信息的守護進程。通常集羣化運行,也能夠獨自部署。sql

下面對nsqd的2個邏輯作一次學習json

  1. 啓動邏輯
  2. 數據存儲

啓動邏輯

在Makefile中,寫到api

$(BLDDIR)/nsqd:        $(wildcard apps/nsqd/*.go       nsqd/*.go       nsq/*.go internal/*/*.go)

能夠找到nsqd的代碼入口在apps/nsqd/nsqd.go緩存

apps/nsqd/nsqd.go

這個文件做爲程序入口,主要作了幾件事情:

  • 接收命令行參數
  • 根據命令行參數,新建nsqd結構
  • 啓動nsqd

首先做者使用svc包來控制程序的啓動:

type program struct {
    nsqd *nsqd.NSQD
}

func main() {
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        log.Fatal(err)
    }
}

func (p *program) Init(env svc.Environment) error {...}

func (p *program) Start() error {...}

func (p *program) Stop() error {...}

使用svc 能更簡潔的保證程序乾淨的退出。在nsqd中,退出信號有兩個:SIGINT(輸入任意健) 和 SIGTERM(kill)。

Start()函數是主要邏輯的入口,在函數中引用了NewOptions(),它會建立一個默認的Options 結構。Options 後續會做爲nsqd啓動的參數來源

opts := nsqd.NewOptions()

做者經過flag包實現了命令行參數接收,若是命令行中執行配置文件,會同時讀取配置文件。根據配置文件,命令行參數,來建立一個nsqd結構

options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

接下來會加載數據

err := nsqd.LoadMetadata()
err = nsqd.PersistMetadata()

LoadMetadata()過程爲:

  1. 先使用atomic庫加鎖
  2. 讀取以node id的文件,以及默認文件,比對兩者,並從文件中獲取數據
  3. 將數據json 解析出meta 結構
  4. 遍歷meta,獲取topic name以及chanel name,對須要暫停的topic/chanel 進行暫停操做

PersistMetadata()過程爲:

  1. 根據nsqd 結構獲取對應的topic和channel
  2. 將topic和channel 持久化到文件中

接下來調用啓動nsqd的主邏輯nsqd.Main(),主要完成如下過程

  1. 根據options 參數監聽tcp端口,http端口,https端口
  2. 啓動4個goroutines分別實啓動http api,queueScanLoop,lookupLoop,statsdLoop
n.waitGroup.Wrap(func() {
        http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
    })

    n.waitGroup.Wrap(func() { n.queueScanLoop() })
    n.waitGroup.Wrap(func() { n.lookupLoop() })
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }

這裏使用到了waitGroup,它是一個groutines 的控制包,能上線相似python 的join()功能。能夠實現全部groutines都執行完再退出。

做者封裝了waitGroup庫

type WaitGroupWrapper struct {
    sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

Add() 會計數器加1,Done()使得計數器減一。此外WaitGroup提供Wait()函數:當計數器歸0時,繼續執行,不然阻塞。等待線程執行完再退出的做用。

此外,將函數做爲參數,再在內部groutines執行,和python的裝飾器的用法相似。

回到Main()函數中,啓動http_api利用到了github.com/nsqio/nsq/internal/http_api包, 設置router等參數後,啓動。

queueScanLoop() 是管道掃進程,他的邏輯是將tpic,channel中的數據讀入到worker channel, 並每隔必定的時間更新worker數量,掃描chanel中的數據。

select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }

這裏使用select來監聽io操做,每隔掃描間隔時,判斷channel中的是否存在數據須要處理,若是沒有,則略過本次掃描。

每隔刷新間隔判斷worker數量是否發生變化。

loop:
        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }

這裏還有dirty比率的概念,channel中有數據就認爲是dirty,當該比率超過配置中的值時,則繼續處理調用worker來處理,而不是等待固定間隔才進行掃描。

啓動lookupLoop()和statsdLoop();這兩個函數的做用初步看和nsqdlookup通訊用,細節還未了解。

上面闡述了nsqd的啓動邏輯。nsqd使用http api和用戶交互

數據存儲

api文檔中,看到pub接口用來發布信息:

使用示例
curl -d "<message>" http://127.0.0.1:4151/pub?topic=name

在nsqd/http.go中,定義了路由規則

func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer {
    ...
    s := &httpServer{
        ctx:         ctx,
        tlsEnabled:  tlsEnabled,
        tlsRequired: tlsRequired,
        router:      router,
    }
    router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
    ...
}

在doPUB()函數中,能夠看到數據存儲時,最終調用了opic.PutMessage(msg)

err = topic.PutMessage(msg)
func (t *Topic) PutMessage(m *Message) error {
    t.RLock()
    defer t.RUnlock()
    if atomic.LoadInt32(&t.exitFlag) == 1 {
        return errors.New("exiting")
    }
    err := t.put(m)
    if err != nil {
        return err
    }
    atomic.AddUint64(&t.messageCount, 1)
    return nil
}

PutMessage的邏輯是作併發控制(加鎖)後,調Topic.put(*Message) 來寫入信息。

這裏有兩個鎖控制機制:

  1. RLock
  2. atomic

RLoclk

go語言中,sync包有兩種鎖,分別是互斥鎖sync.Mutex和讀寫鎖sync.RWMutex。

type Mutex
    func (m *Mutex) Lock()
    func (m *Mutex) Unlock()

type RWMutex
    func (rw *RWMutex) Lock()
    func (rw *RWMutex) RLock()
    func (rw *RWMutex) RLocker() Locker
    func (rw *RWMutex) RUnlock()
    func (rw *RWMutex) Unlock()

互斥鎖傾向於在全局使用,一旦加鎖,就必須解鎖以後才能訪問。不二次加鎖、二次解鎖都會報錯。

讀寫鎖用在讀遠多於寫的場景。

Lock()表示寫加鎖,加寫鎖前,若是已經存在寫鎖,或者其餘讀鎖,會阻塞住,直到鎖可用。已阻塞的 Lock 調用會從得到的鎖中排除新的讀取器,即寫鎖權限高於讀鎖,有寫鎖時優先進行寫鎖定。

RLock()表示讀加鎖,當有寫鎖時,沒法加載讀鎖,當只有讀鎖或者沒有鎖時,能夠加載讀鎖,讀鎖能夠加載多個,因此適用於"讀多寫少"的場景。

關於讀寫鎖的具體例子請參考golang中sync.RWMutex和sync.Mutex區別

atomic

atomic是sync包中的另外一種鎖機制,在實現上,它比互斥鎖層級更低:互斥鎖調用的是golang的api,而atomic是在內核層面實現。所以它比互斥鎖效率更高,可是使用上也存在必定的限制。若是使用存儲相關接口,存入的是nil,或者類型不對,會報錯。

此外,在一些文章中,以及stack overflow中都提到儘可能少用atomic,具體緣由還不知道。

atomic有幾種常見的函數:

  1. CAS:比較和存儲,若是是等於舊的值,就將新的值寫入
  2. 增長或減小
  3. 讀取或寫入

具體查看atomic介紹

在上面的PutMessage邏輯中,增長topic讀鎖和topic中的部分值的原子操做鎖後,調用了put()函數來實現寫入。

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

put函數的操做是,將Message寫入channel,若是該topic的memoryMsgChan長度滿了,則經過default邏輯,寫入buffer中.

buffer的實現是利用了sync.Pool包,至關因而一塊緩存,在gc前釋放,存儲的長度受限於內存大小。

這裏有兩個問題:

  1. 存入memoryMsgChan就算完成topic寫入了嗎
  2. buffer中的數據怎麼辦

通過查找,發現處理上述兩個channel的函數是messagePump,而messagePump在建立一個新Topic時會被後臺調用:

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    ...
    t.waitGroup.Wrap(func() {t.messagePump()})
    ...
}
func (t *Topic) messagePump() {
    ...
    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }
    select {
        case msg = <-memoryMsgChan:
        case buf = <-backendChan:
            msg, err = decodeMessage(buf)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }    
        ...
    }
    ...
    for i, channel := range chans {
        chanMsg := msg 
        if i > 0 {
                chanMsg = NewMessage(msg.ID, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }        
        ...
        err := channel.PutMessage(chanMsg)
        ...
    }
    ...
}

上述調用了channel的PutMessage()完成了message寫入channel的memoryMsgChan中,寫入邏輯和寫入topic邏輯相似。到這裏完成了數據的寫入流程分析。

nsqlookup

官方的介紹以下

nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.

There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.

大意爲:nsqlookup是管理nsqd集羣拓補信息的守護進程。nsqlookup用於

  1. 供客戶端查詢,得到具體的topic和channel
  2. nsqd節點將本身的信息廣播給nsqloookup。

下面梳理一下nsqllookup的兩個邏輯:

  1. 供客戶端查詢具體的topic數據
  2. 接收nsqd的廣播。

查詢topic和channel

根據查詢數據的過程進行梳理,nsq提供了幾個封裝好的查詢接口,若是nsq_tail、nsq_to_file 等。此處從nsq_til 舉例查看。

nsq_tail中主要邏輯以下:

consumers := []*nsq.Consumer{}
    for i := 0; i < len(topics); i += 1 {
        fmt.Printf("Adding consumer for topic: %s\n", topics[i])

        consumer, err := nsq.NewConsumer(topics[i], *channel, cfg)
        if err != nil {
            log.Fatal(err)
        }

        consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages})

        err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
        if err != nil {
            log.Fatal(err)
        }

        err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
        if err != nil {
            log.Fatal(err)
        }

        consumers = append(consumers, consumer)
    }

nsq_tail的邏輯是針對每一個topic,分別初始化一個消費者consumer, 此處consumer實現的庫是go-nsq。
並實現一個nsq_tail邏輯的handler,初始化在consumer中。

以後從nsqd和nsqdlookup中獲取數據,並調用handler處理。

在go-nsq/consumer.go中,ConnectToNSQLookupd()會調用queryLookupd()和lookupdLoop(),而lookupdLoop()又會按期調用queryLookupd()。代碼以下:

func (r *Consumer) ConnectToNSQLookupd(addr string) error {
    ...
    if numLookupd == 1 {
        r.queryLookupd()
        r.wg.Add(1)
        go r.lookupdLoop()
    }
    ...
}
func (r *Consumer) lookupdLoop() {
    ...
    for {
        select {
        case <-ticker.C:
            r.queryLookupd()
        case <-r.lookupdRecheckChan:
            r.queryLookupd()
        case <-r.exitChan:
            goto exit
        }
    }
    ...
}

// make an HTTP req to one of the configured nsqlookupd instances to discover
// which nsqd's provide the topic we are consuming.
//
// initiate a connection to any new producers that are identified.
func (r *Consumer) queryLookupd() {
    ...
    var nsqdAddrs []string
    for _, producer := range data.Producers {
        broadcastAddress := producer.BroadcastAddress
        port := producer.TCPPort
        joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
        nsqdAddrs = append(nsqdAddrs, joined)
    }
    // apply filter
    if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
        nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
    }
    for _, addr := range nsqdAddrs {
        err = r.ConnectToNSQD(addr)
        if err != nil && err != ErrAlreadyConnected {
            r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
            continue
        }
    }
    
}

在queryLookupd()中,獲取到生產者信息後,調用ConnectToNSQD()鏈接每一個nsqd server。用ConnectToNSQD()實現了讀取message。

ConnectYpNSQD()調用了connection結果的函數readLoop()。

func (c *Conn) readLoop() {
    for {
        ...
        frameType, data, err :=      ReadUnpackedResponse(c)
        ...
        switch frameType {
        case FrameTypeResponse:
            c.delegate.OnResponse(c, data)
        case FrameTypeMessage:
            msg, err := DecodeMessage(data)
            if err != nil {
                c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            msg.Delegate = delegate
            msg.NSQDAddress = c.String()

            atomic.AddInt64(&c.rdyCount, -1)
            atomic.AddInt64(&c.messagesInFlight, 1)
            atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())

            c.delegate.OnMessage(c, msg)
            ...
    }
}

在c.delegate.OnMessage(c, msg)中,會將message寫入Consumer.incomingMessages。完成數據讀取。

相關文章
相關標籤/搜索