Nsq 原理分析(一)

Nsq 是用 go 語言開發的輕量級的分佈式消息隊列,適合小型項目使用、用來學習消息隊列實現原理、學習 golang channel知識以及如何用 go 來寫分佈式,爲何說適合小型小型項目使用由於,nsq 若是沒有能力進行二次開發的狀況存在的問題仍是不少的。linux


Nsq 模塊介紹

nsqd:是一個進程監聽了 http、tcp 兩種協議,用來建立 topic、channel,分發消息給消費者,向 nsqlooup 註冊本身的元數據信息(topic、channel、consumer),本身的服務信息,最核心模塊。git

nsqlookup:存儲了 nsqd 的元數據和服務信息(endpoind),向消費者提供服務發現功能,向 nsqadmin 提供數據查詢功能。github

nsqadmin:簡單的管理界面,展現了 topic、channel以及channel上的消費者,也能夠建立 topic、channel
nsq.gif
摘自官網
生產者向某個topic中發送消息,若是topic有一個或者多個channle,那麼該消息會被複制多分發送到每個channel中。相似 rabbitmq中的fanout類型,channle相似隊列。
官方說 nsq 是分佈式的消息隊列服務,可是在我看來只有channel到消費者這部分提現出來分佈式的感受,nsqd 這個模塊其實就是單點的,nsqd 將 topic、channel、以及消息都存儲在了本地磁盤,官方還建議一個生產者使用一個 nsqd,這樣不只浪費資源尚未數據備份的保障。一旦 nsqd 所在的主機磁損壞,數據都將丟失。golang

Nsq 源碼分析

先部署一個簡單的環境,以 centos 操做系統爲例sql

下載
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
解壓
tar xvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
cd nsq-1.2.0.linux-amd64.go1.12.9/bin
cp * /bin

啓動三個終端,一個用來啓動 nsqadmin(管理界面)、nsqlookup(nsqd服務以及元數據管理)、nsqd(nsq核心模塊,元數據、消息存儲以及消息分發), ip 換成本身的真實ipbootstrap

終端1
/bin/nsqd --lookupd-tcp-address 192.168.1.1:4160 -tcp-address 0.0.0.0:4152 -http-address 0.0.0.0:4153  --broadcast-address 192.168.1.1
終端2
/bin/nsqlookupd --broadcast-address 192.168.1.1:4160
終端3
/bin/nsqadmin --lookupd-http-address 192.168.1.1:4160

看一下 nsq 的簡單使用centos

cat producer.go
package main
import "github.com/nsqio/go-nsq"
config := nsq.NewConfig()
p, _ := nsq.NewProducer(addr, config)
err := p.Publish("topic", []byte("message"))
if err != nil {
    fmt.Printf("dispatch task failed %s", err)
}

cat consumer.go
package main
import "github.com/nsqio/go-nsq"

type MyHandler struct {}

func (h *MyHandler) HandleMessage(message *nsq.Message) error {
    fmt.Printf("consume message %+v\n", message)
}

config := nsq.NewConfig()
c, _ := nsq.NewConsumer("topic", "channel", config)
c.SetLoggerLevel(nsq.LogLevelDebug)
handler := &MyHandler{}
c.AddHandler(handler)
// 這裏端口是4161 是 nsqlookup 的 http 端口, nsqd 和 nsqlookup 都同時監聽了 tcp和http兩個協議
err := c.ConnectToNSQLookupd("192.168.1.1:4161")
if err != nil {
    fmt.Printf("Connect nsq lookup failed %+v\n", err)
}

1. 生產者代碼分析api

go-nsq/producer.go併發

// After Config is passed into NewProducer the values are no longer mutable (they are copied).
func NewProducer(addr string, config *Config) (*Producer, error) {
    err := config.Validate()
    if err != nil {
        return nil, err
    }

    p := &Producer{
        id: atomic.AddInt64(&instCount, 1),

        addr:   addr,
        config: *config,

        logger: make([]logger, int(LogLevelMax+1)),
        logLvl: LogLevelInfo,

        transactionChan: make(chan *ProducerTransaction),
        exitChan:        make(chan int),
        responseChan:    make(chan []byte),
        errorChan:       make(chan []byte),
    }

    // Set default logger for all log levels
    l := log.New(os.Stderr, "", log.Flags())
    for index, _ := range p.logger {
        p.logger[index] = l
    }
    return p, nil
}

初始化了 Producer 的結構體app

// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {   
    return w.sendCommand(Publish(topic, body))
}

指定要往哪一個 topic 中發送消息以及要發送的消息

// Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) *Command {
    var params = [][]byte{[]byte(topic)}
    return &Command{[]byte("PUB"), params, body}
}

封裝了命令

func (w *Producer) sendCommand(cmd *Command) error {
    doneChan := make(chan *ProducerTransaction)
    // 內部使用了異步發送的方式
    err := w.sendCommandAsync(cmd, doneChan, nil)
    if err != nil {
        close(doneChan)
        return err
    }
    // 等待異步發送完成
    t := <-doneChan
    return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
    args []interface{}) error {
    // keep track of how many outstanding producers we're dealing with
    // in order to later ensure that we clean them all up...
    atomic.AddInt32(&w.concurrentProducers, 1)
    defer atomic.AddInt32(&w.concurrentProducers, -1)
    // 判斷有沒有和 nsqd 創建鏈接,已經創建跳過
    if atomic.LoadInt32(&w.state) != StateConnected {
        err := w.connect()
        if err != nil {
            return err
        }
    }

    t := &ProducerTransaction{
        cmd:      cmd,
        doneChan: doneChan,
        Args:     args,
    }

    select {
    case w.transactionChan <- t:
    case <-w.exitChan:
        return ErrStopped
    }
    return nil
}

在上面這段代碼中依然沒有看到將 PUB command 發送給 nsqd進程的代碼, 咱們看一下那個 connect 函數

func (w *Producer) connect() error {
    w.guard.Lock()
    defer w.guard.Unlock()

    if atomic.LoadInt32(&w.stopFlag) == 1 {
        return ErrStopped
    }

    switch state := atomic.LoadInt32(&w.state); state {
    case StateInit:
    case StateConnected:
        return nil
    default:
        return ErrNotConnected
    }

    w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

    w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
    w.conn.SetLoggerLevel(w.getLogLevel())
    format := fmt.Sprintf("%3d (%%s)", w.id)
    for index := range w.logger {
        w.conn.SetLoggerForLevel(w.logger[index], LogLevel(index), format)
    }
    // 這個主要是消費者在使用。在消費者部分會詳細分析
    _, err := w.conn.Connect()
    if err != nil {
        w.conn.Close()
        w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
        return err
    }
    atomic.StoreInt32(&w.state, StateConnected)
    w.closeChan = make(chan int)
    w.wg.Add(1)
    // 生產者利用這個 goroutine 向 nsqd 發送命令和接收響應
    go w.router()

    return nil
}
func (w *Producer) router() {
    for {
        select {
        // 在上面的 sendCommandAsync 這個方法中只看到了將待發送的命令又包裝了一下扔到了一個 channel 中,這裏在監聽,以及將命令發送給nsqd
        case t := <-w.transactionChan:
            w.transactions = append(w.transactions, t)
            err := w.conn.WriteCommand(t.cmd)
            if err != nil {
                w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
                w.close()
            }
            // 接收 nsqd 的響應
        case data := <-w.responseChan:
            w.popTransaction(FrameTypeResponse, data)
        case data := <-w.errorChan:
            w.popTransaction(FrameTypeError, data)
        case <-w.closeChan:
            goto exit
        case <-w.exitChan:
            goto exit
        }
    }

exit:
    w.transactionCleanup()
    w.wg.Done()
    w.log(LogLevelInfo, "exiting router")
}

2. 消費者代碼分析

// NewConsumer creates a new instance of Consumer for the specified topic/channel
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into NewConsumer the values are no longer mutable (they are copied).
// 指定要監聽的訂閱的 topic 和 channel
func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) {
    if err := config.Validate(); err != nil {
        return nil, err
    }

    if !IsValidTopicName(topic) {
        return nil, errors.New("invalid topic name")
    }

    if !IsValidChannelName(channel) {
        return nil, errors.New("invalid channel name")
    }

    r := &Consumer{
        id: atomic.AddInt64(&instCount, 1),

        topic:   topic,
        channel: channel,
        config:  *config,

        logger:      make([]logger, LogLevelMax+1),
        logLvl:      LogLevelInfo,
        maxInFlight: int32(config.MaxInFlight),

        incomingMessages: make(chan *Message),

        rdyRetryTimers:     make(map[string]*time.Timer),
        pendingConnections: make(map[string]*Conn),
        connections:        make(map[string]*Conn),

        lookupdRecheckChan: make(chan int, 1),

        rng: rand.New(rand.NewSource(time.Now().UnixNano())),

        StopChan: make(chan int),
        exitChan: make(chan int),
    }

    // Set default logger for all log levels
    l := log.New(os.Stderr, "", log.Flags())
    for index := range r.logger {
        r.logger[index] = l
    }

    r.wg.Add(1)
    // 由於nsq是推送push的方式消費消息,因此早消費者端會控制消費的速度,限流做用,能夠配置能夠自動更新
    go r.rdyLoop()
    return r, nil
}

初始化 Consumer結構體

初始化後須要添加消息處理函數 AddHandler

// AddHandler sets the Handler for messages received by this Consumer. This can be called
// multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddHandler(handler Handler) {
    r.AddConcurrentHandlers(handler, 1)
}

// AddConcurrentHandlers sets the Handler for messages received by this Consumer.  It
// takes a second argument which indicates the number of goroutines to spawn for
// message handling.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
    if atomic.LoadInt32(&r.connectedFlag) == 1 {
        panic("already connected")
    }

    atomic.AddInt32(&r.runningHandlers, int32(concurrency))
    for i := 0; i < concurrency; i++ {
        // 能夠設置併發
        go r.handlerLoop(handler)
    }
}

func (r *Consumer) handlerLoop(handler Handler) {
    r.log(LogLevelDebug, "starting Handler")

    for {
        // 不斷的接收 nsqd 發送過來的請求, readloop這個死循環方法會向這個channel仍消息進來,後面咱們會說到
        message, ok := <-r.incomingMessages
        if !ok {
            goto exit
        }

        if r.shouldFailMessage(message, handler) {
            message.Finish()
            continue
        }
       // 使用咱們添加的消息處理函數來消費消息
        err := handler.HandleMessage(message)
        if err != nil {
            r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
            if !message.IsAutoResponseDisabled() {
                message.Requeue(-1)
            }
            continue
        }
       // 當一條消息處理完成是否從隊列中移除,至關於提交,默認消費完一條消息自動提交,能夠設置批量提交
        if !message.IsAutoResponseDisabled() {
            message.Finish()
        }
    }

exit:
    r.log(LogLevelDebug, "stopping Handler")
    if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
        r.exit()
    }
}

func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
    // message passed the max number of attempts
    if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
        r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
            message.ID, message.Attempts)

        logger, ok := handler.(FailedMessageLogger)
        if ok {
            logger.LogFailedMessage(message)
        }

        return true
    }
    return false
}

func (r *Consumer) exit() {
    r.exitHandler.Do(func() {
        close(r.exitChan)
        r.wg.Wait()
        close(r.StopChan)
    })
}
// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
//
// If it is the first to be added, it initiates an HTTP request to discover nsqd
// producers for the configured topic.
//
// A goroutine is spawned to handle continual polling.
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
    if atomic.LoadInt32(&r.stopFlag) == 1 {
        return errors.New("consumer stopped")
    }
    if atomic.LoadInt32(&r.runningHandlers) == 0 {
        return errors.New("no handlers")
    }

    if err := validatedLookupAddr(addr); err != nil {
        return err
    }

    atomic.StoreInt32(&r.connectedFlag, 1)

    r.mtx.Lock()
    for _, x := range r.lookupdHTTPAddrs {
        if x == addr {
            r.mtx.Unlock()
            return nil
        }
    }
    r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
    numLookupd := len(r.lookupdHTTPAddrs)
    r.mtx.Unlock()

    // if this is the first one, kick off the go loop
    if numLookupd == 1 {
        r.queryLookupd()
        r.wg.Add(1)
        go r.lookupdLoop()
    }

    return nil
}

消費者須要鏈接到nsqlookup,從nsqlookup中查詢到nsqd的服務信息,而後進行鏈接

// make an HTTP req to one of the configured nsqlookupd instances to discover
// which nsqd's provide the topic we are consuming.
//
// initiate a connection to any new producers that are identified.
func (r *Consumer) queryLookupd() {
    retries := 0

retry:
    endpoint := r.nextLookupdEndpoint()

    r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)

    var data lookupResp
    err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
    if err != nil {
        r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
        retries++
        if retries < 3 {
            r.log(LogLevelInfo, "retrying with next nsqlookupd")
            goto retry
        }
        return
    }

    var nsqdAddrs []string
    for _, producer := range data.Producers {
        broadcastAddress := producer.BroadcastAddress
        port := producer.TCPPort
        joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
        nsqdAddrs = append(nsqdAddrs, joined)
    }
    // apply filter
    if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
        nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
    }
    // 獲取 nsqlookup中因此的nsqd信息,而後進行鏈接
    for _, addr := range nsqdAddrs {
        err = r.ConnectToNSQD(addr)
        if err != nil && err != ErrAlreadyConnected {
            r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
            continue
        }
    }
}

官方不建議消費者端直接鏈接nsqd,

// ConnectToNSQD takes a nsqd address to connect directly to.
//
// It is recommended to use ConnectToNSQLookupd so that topics are discovered
// automatically.  This method is useful when you want to connect to a single, local,
// instance.
func (r *Consumer) ConnectToNSQD(addr string) error {
    if atomic.LoadInt32(&r.stopFlag) == 1 {
        return errors.New("consumer stopped")
    }

    if atomic.LoadInt32(&r.runningHandlers) == 0 {
        return errors.New("no handlers")
    }

    atomic.StoreInt32(&r.connectedFlag, 1)
    // 初始化
    conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
    conn.SetLoggerLevel(r.getLogLevel())
    format := fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)
    for index := range r.logger {
        conn.SetLoggerForLevel(r.logger[index], LogLevel(index), format)
    }
    r.mtx.Lock()
    _, pendingOk := r.pendingConnections[addr]
    _, ok := r.connections[addr]
    if ok || pendingOk {
        r.mtx.Unlock()
        return ErrAlreadyConnected
    }
    r.pendingConnections[addr] = conn
    if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
        r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
    }
    r.mtx.Unlock()

    r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)

    cleanupConnection := func() {
        r.mtx.Lock()
        delete(r.pendingConnections, addr)
        r.mtx.Unlock()
        conn.Close()
    }
    // 進行鏈接,在分析生產者時看到過,這裏是consumer和nsqd創建了鏈接的地方
    resp, err := conn.Connect()
    if err != nil {
        cleanupConnection()
        return err
    }

    if resp != nil {
        if resp.MaxRdyCount < int64(r.getMaxInFlight()) {
            r.log(LogLevelWarning,
                "(%s) max RDY count %d < consumer max in flight %d, truncation possible",
                conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
        }
    }
    // consumer向nsqd發送訂閱命令,此時consumer會將本身註冊到nsqd中,更準確的說法是consumer將本身註冊到了topic下的channel的client列表中,有消息到來時channle會隨機向本身的客戶端列表發送消息
    cmd := Subscribe(r.topic, r.channel)
    err = conn.WriteCommand(cmd)
    if err != nil {
        cleanupConnection()
        return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s",
            conn, r.topic, r.channel, err.Error())
    }

    r.mtx.Lock()
    delete(r.pendingConnections, addr)
    r.connections[addr] = conn
    r.mtx.Unlock()

    // pre-emptive signal to existing connections to lower their RDY count
    for _, c := range r.conns() {
        r.maybeUpdateRDY(c)
    }

    return nil

go-nsq/conn.go

// Connect dials and bootstraps the nsqd connection
// (including IDENTIFY) and returns the IdentifyResponse
func (c *Conn) Connect() (*IdentifyResponse, error) {
    dialer := &net.Dialer{
        LocalAddr: c.config.LocalAddr,
        Timeout:   c.config.DialTimeout,
    }
    // 生產者或者消費者在這裏與 nsqd 創建 tcp 鏈接
    conn, err := dialer.Dial("tcp", c.addr)
    if err != nil {
        return nil, err
    }
    c.conn = conn.(*net.TCPConn)
    c.r = conn
    c.w = conn
    // 創建鏈接後先發送 4 字節信息表示使用哪一種協議,目前有 v1 和 v2兩種協議
    _, err = c.Write(MagicV2)
    if err != nil {
        c.Close()
        return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
    }
    // 告訴 nsqd 關於本身的一些基本信息,好比心跳間隔、處理消息的超時、client id 等等
    resp, err := c.identify()
    if err != nil {
        return nil, err
    }

    if resp != nil && resp.AuthRequired {
        if c.config.AuthSecret == "" {
            c.log(LogLevelError, "Auth Required")
            return nil, errors.New("Auth Required")
        }
        err := c.auth(c.config.AuthSecret)
        if err != nil {
            c.log(LogLevelError, "Auth Failed %s", err)
            return nil, err
        }
    }

    c.wg.Add(2)
    atomic.StoreInt32(&c.readLoopRunning, 1)
    // 這兩個 goroutine 很重要
    go c.readLoop()
    go c.writeLoop()
    return resp, nil
}
func (c *Conn) readLoop() {
    delegate := &connMessageDelegate{c}
    for {
        if atomic.LoadInt32(&c.closeFlag) == 1 {
            goto exit
        }
        // 從 nsqd獲取消息
        frameType, data, err := ReadUnpackedResponse(c)
        if err != nil {
            if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
                goto exit
            }
            if !strings.Contains(err.Error(), "use of closed network connection") {
                c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
            }
            goto exit
        }
        // 心跳檢測默認30s檢查一次,後面會細說一下這裏
        if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
            c.log(LogLevelDebug, "heartbeat received")
            c.delegate.OnHeartbeat(c)
            err := c.WriteCommand(Nop())
            if err != nil {
                c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            continue
        }

        switch frameType {
        // 處理相應信息
        case FrameTypeResponse:
            c.delegate.OnResponse(c, data)
            // 接收消息進行消費
        case FrameTypeMessage:
            msg, err := DecodeMessage(data)
            if err != nil {
                c.log(LogLevelError, "IO error - %s", err)
                c.delegate.OnIOError(c, err)
                goto exit
            }
            msg.Delegate = delegate
            msg.NSQDAddress = c.String()

            atomic.AddInt64(&c.messagesInFlight, 1)
            atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
             // 這裏將從nsqd那邊獲取到的消息扔到了一個channel中,這個channel就是上面 handlerloop死循環中在等待消息的channel
            c.delegate.OnMessage(c, msg)
        case FrameTypeError:
            c.log(LogLevelError, "protocol error - %s", data)
            c.delegate.OnError(c, data)
        default:
            c.log(LogLevelError, "IO error - %s", err)
            c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
        }
    }

exit:
    atomic.StoreInt32(&c.readLoopRunning, 0)
    // start the connection close
    messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
    if messagesInFlight == 0 {
        // if we exited readLoop with no messages in flight
        // we need to explicitly trigger the close because
        // writeLoop won't
        c.close()
    } else {
        c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
    }
    c.wg.Done()
    c.log(LogLevelInfo, "readLoop exiting")
}
func (c *Conn) writeLoop() {
    for {
        select {
        case <-c.exitChan:
            c.log(LogLevelInfo, "breaking out of writeLoop")
            // Indicate drainReady because we will not pull any more off msgResponseChan
            close(c.drainReady)
            goto exit
        case cmd := <-c.cmdChan:
            err := c.WriteCommand(cmd)
            if err != nil {
                c.log(LogLevelError, "error sending command %s - %s", cmd, err)
                c.close()
                continue
            }
        case resp := <-c.msgResponseChan:
            // Decrement this here so it is correct even if we can't respond to nsqd
            msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)

            if resp.success {
                c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
                c.delegate.OnMessageFinished(c, resp.msg)
                c.delegate.OnResume(c)
            } else {
                c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
                c.delegate.OnMessageRequeued(c, resp.msg)
                if resp.backoff {
                    c.delegate.OnBackoff(c)
                } else {
                    c.delegate.OnContinue(c)
                }
            }

            err := c.WriteCommand(resp.cmd)
            if err != nil {
                c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
                c.close()
                continue
            }

            if msgsInFlight == 0 &&
                atomic.LoadInt32(&c.closeFlag) == 1 {
                c.close()
                continue
            }
        }
    }

exit:
    c.wg.Done()
    c.log(LogLevelInfo, "writeLoop exiting")
}

當消息處理完成consumer會經過writeloop向nsqd發送FIN 命令,告訴nsqd我有哪些消息消費完成能夠從隊列中移除了。
其實上面是go nsq這個客戶端的代碼,尚未看到 nsq自己的代碼,先總結一下。而後繼續看nsqd的代碼
生產者

  1. 生產者先初始化Producerj結構體,而後設置一些配置
  2. 生產者和nsqd創建tcp鏈接
  3. 協商版本
  4. 生產者啓動一個route協程,這個協程用來不斷的向nsqd發送PUB指令,同時攜帶消息

消費者

  1. 消費者初始化Consumer結構體
  2. 消費者經過nsqlookup和 nsqd 創建tcp鏈接,nsqd多是一個也多是多個
  3. 協商版本
  4. 創建鏈接後發送本身的識別信息給nsqd,攜帶一些基本配置信息,好比心跳間隔、消息消費超時、客戶端id等等
  5. 啓動RDY限流機制
  6. 啓動 readloop、writeloop
相關文章
相關標籤/搜索