NSQ源碼-Nsq客戶端

看完lookupd和nsqd以後咱們再來看下nsq client端的代碼。 我是想把nsq系統完完整整的看一遍,從而對他造成一個更總體的
認識。對message queue來講他的client端就是生產者和消費者,生產者負責想nsq中投遞消息,消費者負責從lookupd中獲取到
指定nsqd以後,從nsqd中獲取消息。golang

生產者

咱們以nsq/apps/to_nsq/to_nsq.go爲例,客戶端這邊的代碼邏輯就簡單不少,NewProducer實例化一個instance,publish消息
到nsqd。api

/// nsq/apps/to_nsq/to_nsq.go
producer, err := nsq.NewProducer(addr, cfg)
err := producer.Publish(*topic, line)

下面來看下Publish裏的具體邏輯。bash

// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {
    // 生成具體的cmd
    return w.sendCommand(Publish(topic, body))
}
func (w *Producer) sendCommand(cmd *Command) error {
    doneChan := make(chan *ProducerTransaction)
    err := w.sendCommandAsync(cmd, doneChan, nil)
    if err != nil {
        close(doneChan)
        return err
    }
    t := <-doneChan
    return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
    args []interface{}) error {
    // keep track of how many outstanding producers we're dealing with
    // in order to later ensure that we clean them all up...
    atomic.AddInt32(&w.concurrentProducers, 1)
    defer atomic.AddInt32(&w.concurrentProducers, -1)

    if atomic.LoadInt32(&w.state) != StateConnected {
        // 這裏是一個lazily connect
        err := w.connect()
        if err != nil {
            return err
        }
    }

    t := &ProducerTransaction{
        cmd:      cmd,
        doneChan: doneChan,
        Args:     args,
    }

    select {
    case w.transactionChan <- t:
    case <-w.exitChan:
        return ErrStopped
    }

    return nil
}

在connect函數裏啓動了一個go routine去處理transactionChan對應的東西app

func (w *Producer) connect() error {
    w.closeChan = make(chan int)
    w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
    w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
    _, err := w.conn.Connect()
    w.wg.Add(1)
    go w.router()

這裏須要注意一下, go-nsq/conn.go是對底層鏈接的一個抽象,他是不關心你是生產者仍是消費者,這裏使用到了
delegate 模式,conn.go收到消息的處理放到了producerConnDelegate和consumerConnDelegate中,而後通知到具體的
消費者活着生產者。函數

消費者

回過頭咱們再來看下消費者部分的代碼,client端咱們以nsq/apps/nsq_tail/nsq_tail.go爲例,代碼的基本邏輯以下:oop

// 1. new comsunmer instanace 
consumer, err := nsq.NewConsumer(topics[i], *channel, cfg)
// 2. add handler
consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages})
// 3. connect to nsqd
consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
    log.Fatal(err)
}
// 4. connect to lookupd
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
    log.Fatal(err)
}
consumers = append(consumers, consumer)

下面來看下每一個部分的實際代碼:this

func (r *Consumer) AddHandler(handler Handler) {
    r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
    if atomic.LoadInt32(&r.connectedFlag) == 1 {
        panic("already connected")
    }

    atomic.AddInt32(&r.runningHandlers, int32(concurrency))
    for i := 0; i < concurrency; i++ {
        go r.handlerLoop(handler)
    }
}

至此handler添加完成,起一個單獨的go routine來等待消息的到了。atom

func (r *Consumer) handlerLoop(handler Handler) {
    r.log(LogLevelDebug, "starting Handler")

    for {
        message, ok := <-r.incomingMessages // 有新的消息的到來
        if !ok {
            goto exit
        }

        if r.shouldFailMessage(message, handler) {
            message.Finish()
            continue
        }

        err := handler.HandleMessage(message) // 調用以前註冊的handler
        if err != nil {
            r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
            if !message.IsAutoResponseDisabled() {
                message.Requeue(-1)
            }
            continue
        }

        if !message.IsAutoResponseDisabled() {
            message.Finish()
        }
    }

exit:
    r.log(LogLevelDebug, "stopping Handler")
    if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
        r.exit()
    }
}

官方是不推薦只部署nqd而不部署lookupd的,咱們直接看下lookup的鏈接過程:3d

func (r *Consumer) ConnectToNSQLookupd(addr string) error {
    ...
    r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
    numLookupd := len(r.lookupdHTTPAddrs)
    r.mtx.Unlock()

    // if this is the first one, kick off the go loop
    if numLookupd == 1 {
        r.queryLookupd()
        r.wg.Add(1)
        go r.lookupdLoop()
    }
    return nil
}

在queryLookupd中先去查詢lookupd獲取最新的nqd地址,而後connect to nsqd.rest

func (r *Consumer) lookupdLoop() {
    // add some jitter so that multiple consumers discovering the same topic,
    // when restarted at the same time, dont all connect at once.
    ticker = time.NewTicker(r.config.LookupdPollInterval)
    // 每一個ticker interval更新nqd的地址信息
    for {
        select {
        case <-ticker.C:
            r.queryLookupd()
        case <-r.lookupdRecheckChan:
            r.queryLookupd()
        case <-r.exitChan:
            goto exit
        }
    }
}
func (r *Consumer) ConnectToNSQD(addr string) error {
    // 1. new connection
    conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
    conn.SetLogger(logger, logLvl,
    fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))

    // 2. connection list
    _, pendingOk := r.pendingConnections[addr]
    _, ok := r.connections[addr]

    r.pendingConnections[addr] = conn
    if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
        r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
    }

    r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)
    // 3. new connect
    //   3.1 go c.readLoop()
    //   3.2 go c.writeLoop()
    resp, err := conn.Connect()
    
    // 4. sub to nsqd
    cmd := Subscribe(r.topic, r.channel)
    err = conn.WriteCommand(cmd)
}

以上就是客戶端初始化的一個流程,而後就是接受消息處理了。

->NewConsumer() // 新建一個consumer
->ConnectToNSQLookupds() // 鏈接到lookupd
  |->ConnectToNSQLookupd() // 鏈接到lookupd
     |->r.queryLookupd() // 查詢lookupd的
         |->apiRequestNegotiateV1() // 調用lookupd的rest api獲取nsqd消息
         |->ConnectToNSQD() // 鏈接到具體nsq
            |->NewConn() // 鏈接instance
            |->conn.Connect() // 開始鏈接
                  |->c.readLoop() // 與nqd鏈接read loop
                  |->c.writeLoop() // 與nqd鏈接write loop
            |->Subscribe() // consumer發送SUB command
     |->lookupdLoop() // 定時查詢lookupd並更新nsqd信息

注:

[1]. 關於delegate模式參考 這裏

相關文章
相關標籤/搜索