nsq消息隊列是怎麼實現消費者接收消息的?

1.建立消費者

經過c:=nsq.NewConsumer(...)方式建立消費者markdown

2.消費者註冊消息監聽

(1)給消費者c增長異步處理器handler: c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message)))異步

(2)AddHandler方法中調用AddConcurrentHandlers()tcp

(3)AddConcurrentHandlers()中開啓一個goroutine,調用handlerLoop(handler), handlerLoop中開啓無限循環,經過無緩衝通道incomingMessages,阻塞監聽最新消息, 獲取到消息,傳遞給回調handler.HandleMessage(message)處理oop

func (r *Consumer) AddHandler(handler Handler) {
	r.AddConcurrentHandlers(handler, 1)
}

func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
	for i := 0; i < concurrency; i++ {
		go r.handlerLoop(handler)
	}
}

func (r *Consumer) handlerLoop(handler Handler) {

	for {
		message, ok := <-r.incomingMessages
		if !ok {
			goto exit
		}

		if r.shouldFailMessage(message, handler) {
			message.Finish()
			continue
		}

		err := handler.HandleMessage(message)
                ...
             }

複製代碼
3.鏈接NSQLookupd

調用c.ConnectToNSQLookupd(v),該方法中會調用queryLookupd()->ConnectToNSQD()spa

func (r *Consumer) ConnectToNSQLookupd(addr string) error {
	
	...
	if numLookupd == 1 {
		r.queryLookupd()
		r.wg.Add(1)
		go r.lookupdLoop()
	}

	return nil
}

func (r *Consumer) queryLookupd() {
	..
	for _, addr := range nsqdAddrs {
		err = r.ConnectToNSQD(addr)
		...
	}
}
複製代碼

在ConnectToNSQD()中咱們能夠看到,會經過conn.Connect()創建跟nsqd的TCP鏈接code

func (r *Consumer) ConnectToNSQD(addr string) error {
	...
	resp, err := conn.Connect()
	...
	
}
複製代碼

在Connect會開啓一個goroutine,在readLoop方法中無限循環的監聽消息的到來orm

func (c *Conn) Connect() (*IdentifyResponse, error) {
	dialer := &net.Dialer{
		LocalAddr: c.config.LocalAddr,
		Timeout:   c.config.DialTimeout,
	}

	conn, err := dialer.Dial("tcp", c.addr)
	if err != nil {
		return nil, err
	}
	c.conn = conn.(*net.TCPConn)
	...
	go c.readLoop()
	go c.writeLoop()
	return resp, nil
}
複製代碼

當收到消息後,交給c.delegate.OnMessage()方法處理,在該方法中,會把消息發送給無緩衝消息通道incomingMessages,這樣就完成了整個接收消息的邏輯.string

func (c *Conn) readLoop() {
	delegate := &connMessageDelegate{c}
	for {
		...
		frameType, data, err := ReadUnpackedResponse(c)

		switch frameType {
		...
		case FrameTypeMessage:
			msg, err := DecodeMessage(data)
			...
			c.delegate.OnMessage(c, msg)
                        ...
                   }
		...
}

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
	...
	r.incomingMessages <- msg
	...
}
複製代碼
相關文章
相關標籤/搜索