看完lookupd和nsqd以後咱們再來看下nsq client端的代碼。 我是想把nsq系統完完整整的看一遍,從而對他造成一個更總體的
認識。對message queue來講他的client端就是生產者和消費者,生產者負責想nsq中投遞消息,消費者負責從lookupd中獲取到
指定nsqd以後,從nsqd中獲取消息。golang
咱們以nsq/apps/to_nsq/to_nsq.go爲例,客戶端這邊的代碼邏輯就簡單不少,NewProducer實例化一個instance,publish消息
到nsqd。api
/// nsq/apps/to_nsq/to_nsq.go producer, err := nsq.NewProducer(addr, cfg) err := producer.Publish(*topic, line)
下面來看下Publish裏的具體邏輯。bash
// Publish synchronously publishes a message body to the specified topic, returning // an error if publish failed func (w *Producer) Publish(topic string, body []byte) error { // 生成具體的cmd return w.sendCommand(Publish(topic, body)) }
func (w *Producer) sendCommand(cmd *Command) error { doneChan := make(chan *ProducerTransaction) err := w.sendCommandAsync(cmd, doneChan, nil) if err != nil { close(doneChan) return err } t := <-doneChan return t.Error }
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction, args []interface{}) error { // keep track of how many outstanding producers we're dealing with // in order to later ensure that we clean them all up... atomic.AddInt32(&w.concurrentProducers, 1) defer atomic.AddInt32(&w.concurrentProducers, -1) if atomic.LoadInt32(&w.state) != StateConnected { // 這裏是一個lazily connect err := w.connect() if err != nil { return err } } t := &ProducerTransaction{ cmd: cmd, doneChan: doneChan, Args: args, } select { case w.transactionChan <- t: case <-w.exitChan: return ErrStopped } return nil }
在connect函數裏啓動了一個go routine去處理transactionChan對應的東西app
func (w *Producer) connect() error { w.closeChan = make(chan int) w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w}) w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id)) _, err := w.conn.Connect() w.wg.Add(1) go w.router()
這裏須要注意一下, go-nsq/conn.go是對底層鏈接的一個抽象,他是不關心你是生產者仍是消費者,這裏使用到了
delegate 模式,conn.go收到消息的處理放到了producerConnDelegate和consumerConnDelegate中,而後通知到具體的
消費者活着生產者。函數
回過頭咱們再來看下消費者部分的代碼,client端咱們以nsq/apps/nsq_tail/nsq_tail.go爲例,代碼的基本邏輯以下:oop
// 1. new comsunmer instanace consumer, err := nsq.NewConsumer(topics[i], *channel, cfg) // 2. add handler consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages}) // 3. connect to nsqd consumer.ConnectToNSQDs(nsqdTCPAddrs) if err != nil { log.Fatal(err) } // 4. connect to lookupd err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs) if err != nil { log.Fatal(err) } consumers = append(consumers, consumer)
下面來看下每一個部分的實際代碼:this
func (r *Consumer) AddHandler(handler Handler) { r.AddConcurrentHandlers(handler, 1) }
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) { if atomic.LoadInt32(&r.connectedFlag) == 1 { panic("already connected") } atomic.AddInt32(&r.runningHandlers, int32(concurrency)) for i := 0; i < concurrency; i++ { go r.handlerLoop(handler) } }
至此handler添加完成,起一個單獨的go routine來等待消息的到了。atom
func (r *Consumer) handlerLoop(handler Handler) { r.log(LogLevelDebug, "starting Handler") for { message, ok := <-r.incomingMessages // 有新的消息的到來 if !ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } err := handler.HandleMessage(message) // 調用以前註冊的handler if err != nil { r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID) if !message.IsAutoResponseDisabled() { message.Requeue(-1) } continue } if !message.IsAutoResponseDisabled() { message.Finish() } } exit: r.log(LogLevelDebug, "stopping Handler") if atomic.AddInt32(&r.runningHandlers, -1) == 0 { r.exit() } }
官方是不推薦只部署nqd而不部署lookupd的,咱們直接看下lookup的鏈接過程:3d
func (r *Consumer) ConnectToNSQLookupd(addr string) error { ... r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr) numLookupd := len(r.lookupdHTTPAddrs) r.mtx.Unlock() // if this is the first one, kick off the go loop if numLookupd == 1 { r.queryLookupd() r.wg.Add(1) go r.lookupdLoop() } return nil }
在queryLookupd中先去查詢lookupd獲取最新的nqd地址,而後connect to nsqd.rest
func (r *Consumer) lookupdLoop() { // add some jitter so that multiple consumers discovering the same topic, // when restarted at the same time, dont all connect at once. ticker = time.NewTicker(r.config.LookupdPollInterval) // 每一個ticker interval更新nqd的地址信息 for { select { case <-ticker.C: r.queryLookupd() case <-r.lookupdRecheckChan: r.queryLookupd() case <-r.exitChan: goto exit } } }
func (r *Consumer) ConnectToNSQD(addr string) error { // 1. new connection conn := NewConn(addr, &r.config, &consumerConnDelegate{r}) conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)) // 2. connection list _, pendingOk := r.pendingConnections[addr] _, ok := r.connections[addr] r.pendingConnections[addr] = conn if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 { r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr) } r.log(LogLevelInfo, "(%s) connecting to nsqd", addr) // 3. new connect // 3.1 go c.readLoop() // 3.2 go c.writeLoop() resp, err := conn.Connect() // 4. sub to nsqd cmd := Subscribe(r.topic, r.channel) err = conn.WriteCommand(cmd) }
以上就是客戶端初始化的一個流程,而後就是接受消息處理了。
->NewConsumer() // 新建一個consumer ->ConnectToNSQLookupds() // 鏈接到lookupd |->ConnectToNSQLookupd() // 鏈接到lookupd |->r.queryLookupd() // 查詢lookupd的 |->apiRequestNegotiateV1() // 調用lookupd的rest api獲取nsqd消息 |->ConnectToNSQD() // 鏈接到具體nsq |->NewConn() // 鏈接instance |->conn.Connect() // 開始鏈接 |->c.readLoop() // 與nqd鏈接read loop |->c.writeLoop() // 與nqd鏈接write loop |->Subscribe() // consumer發送SUB command |->lookupdLoop() // 定時查詢lookupd並更新nsqd信息
[1]. 關於delegate模式參考 這裏