經過c:=nsq.NewConsumer(...)方式建立消費者markdown
(1)給消費者c增長異步處理器handler: c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message)))異步
(2)AddHandler方法中調用AddConcurrentHandlers()tcp
(3)AddConcurrentHandlers()中開啓一個goroutine,調用handlerLoop(handler), handlerLoop中開啓無限循環,經過無緩衝通道incomingMessages,阻塞監聽最新消息, 獲取到消息,傳遞給回調handler.HandleMessage(message)處理oop
func (r *Consumer) AddHandler(handler Handler) {
r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler)
}
}
func (r *Consumer) handlerLoop(handler Handler) {
for {
message, ok := <-r.incomingMessages
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message)
...
}
複製代碼
調用c.ConnectToNSQLookupd(v),該方法中會調用queryLookupd()->ConnectToNSQD()spa
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
...
if numLookupd == 1 {
r.queryLookupd()
r.wg.Add(1)
go r.lookupdLoop()
}
return nil
}
func (r *Consumer) queryLookupd() {
..
for _, addr := range nsqdAddrs {
err = r.ConnectToNSQD(addr)
...
}
}
複製代碼
在ConnectToNSQD()中咱們能夠看到,會經過conn.Connect()創建跟nsqd的TCP鏈接code
func (r *Consumer) ConnectToNSQD(addr string) error {
...
resp, err := conn.Connect()
...
}
複製代碼
在Connect會開啓一個goroutine,在readLoop方法中無限循環的監聽消息的到來orm
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
conn, err := dialer.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
c.conn = conn.(*net.TCPConn)
...
go c.readLoop()
go c.writeLoop()
return resp, nil
}
複製代碼
當收到消息後,交給c.delegate.OnMessage()方法處理,在該方法中,會把消息發送給無緩衝消息通道incomingMessages,這樣就完成了整個接收消息的邏輯.string
func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
...
frameType, data, err := ReadUnpackedResponse(c)
switch frameType {
...
case FrameTypeMessage:
msg, err := DecodeMessage(data)
...
c.delegate.OnMessage(c, msg)
...
}
...
}
func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
...
r.incomingMessages <- msg
...
}
複製代碼