nsq源碼review:go-nsq producer

總述

nsq是一個實時分佈式的消息隊列平臺。git

核心部分是一個叫nsqd的模塊,它負責接收和轉發消息。同時在go-nsq的包中,提供了consumer和producer的核心接口。在讀nsq源碼的時候,很好奇它的數據是怎麼從producer給到了consumer的,因而從源碼的層面梳理了一下代碼的實現細節。這部分先記錄一下producer和consumer的代碼細節,方便後續再查看相關代碼。後面準備把nsqd和nsqdlookup相關的東西記錄一下,包括數據分發、數據緩存、服務發現等實現細節。github

go-nsq裏的producer和consumer實現的功能就是一句話,提供消息接受和分發的接口。可是它內部的實現確頗有意思。數組

nsq demo

學習源碼仍是要先從demo起步,首先裝好nsq,能夠git下來源碼編譯或者執行下載的二進制文件。先寫個producer.go,以下緩存

package main

import (
	"github.com/nsqio/go-nsq"
	"log"
)

func main() {
	cfg := nsq.NewConfig()
	r := []byte("hello consumer")
	addr := "127.0.0.1:4150"
	p, err := nsq.NewProducer(addr, cfg)
	if err != nil {
		log.Print(err)
	}
	err = p.Publish("serving123", r)
	if err != nil {
		log.Println(err)
	}
}
複製代碼

例子裏,我建立了一個producer,而後向serving123的topic裏發送消息。bash

下面是consumer的代碼,consumer.go併發

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"math/rand"
	"os"
	"time"
)

type SimpleHandler struct {
}

func (sh *SimpleHandler) HandleMessage(m *nsq.Message) error {
	_, err := os.Stdout.Write(m.Body)
	if err != nil {
		fmt.Println(err)
	}
	return nil
}

func main() {
	pause := make(chan bool)
	caddr := "127.0.0.1:4150"
	cfg := nsq.NewConfig()
	channel := fmt.Sprintf("tail%06d#ephemeral", rand.Int()%999999)
	c, _ := nsq.NewConsumer("serving123", channel, cfg)
	c.AddHandler(&SimpleHandler{})
	c.ConnectToNSQD(caddr)
	<-pause
}
複製代碼

consumer,我定義了一個SimpleHandler,它實現了HandleMessage的接口,功能就是向標準輸出打印消息。 main函數裏,我新建了一個想要消費serving123這個topic的consumer,而後鏈接上nsqd服務。app

運行

直接執行nsqd啓一個服務。先執行consumer,而後執行producer,能夠看到consumer裏打印出的消息。異步

producer源碼解析

完成上面的demo以後,先來看一下producer是怎麼玩的。分佈式

producer實例,會去調用Publish方法去發佈消息,這個方法接收了topic和message body。函數

func (w *Producer) Publish(topic string, body []byte) error {
	return w.sendCommand(Publish(topic, body))
}
複製代碼

內部的另一個Publish方法執行結果做爲參數傳入sendCommandPublish方法,構造了一個Command的三元消息體

func Publish(topic string, body []byte) *Command {
	var params = [][]byte{[]byte(topic)}
	return &Command{[]byte("PUB"), params, body}
}
複製代碼

sendCommand方法裏,初始化一個名叫doneChan的ProducerTransaction的指針,而後,cmd消息三元體,和doneChan一塊兒傳入sendCommandAsync方法裏。最後監聽doneChan的管道輸出,而後返回error。

func (w *Producer) sendCommand(cmd *Command) error {
	doneChan := make(chan *ProducerTransaction)
	err := w.sendCommandAsync(cmd, doneChan, nil)
	if err != nil {
		close(doneChan)
		return err
	}
	t := <-doneChan
	return t.Error
}
複製代碼

ProducerTransaction結構體頗有意思,它持有一個本身相同類型的指針。目的是將最終的內容本身保存起來。在最終clearup的時候,它會調用一個finish方法去觸發上面的chan監聽返回,從而最終返回退出sendCommand

type ProducerTransaction struct {
	cmd      *Command
	doneChan chan *ProducerTransaction
	Error    error         // the error (or nil) of the publish command
	Args     []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync
}
複製代碼

這個finish方法會把ProducerTransaction它本身自己進行傳遞,也就是在t.doneChan <- t這裏

func (t *ProducerTransaction) finish() {
	if t.doneChan != nil {
		t.doneChan <- t
	}
}
複製代碼

看一下sendCommandAsync函數。首先,它會去調用原子操做atomic.AddInt32去記錄併發producer數量。上面說的doneChan指針,做爲一個新的ProducerTransaction的參數傳入,最終這個新的ProducerTransaction的數據傳給了w.transactionChan監聽的channel。

func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
	args []interface{}) error {
	// keep track of how many outstanding producers we're dealing with // in order to later ensure that we clean them all up... atomic.AddInt32(&w.concurrentProducers, 1) defer atomic.AddInt32(&w.concurrentProducers, -1) if atomic.LoadInt32(&w.state) != StateConnected { err := w.connect()//創建和nsqd的鏈接 if err != nil { return err } } t := &ProducerTransaction{ cmd: cmd, doneChan: doneChan, Args: args, } select { case w.transactionChan <- t: case <-w.exitChan: return ErrStopped } return nil } 複製代碼

創建和nsqd的鏈接,以及發送數據都寫在w.connect()。瞭解了上面channel異步操做,看下這個connect函數

func (w *Producer) connect() error {
	w.guard.Lock()
	defer w.guard.Unlock()

	if atomic.LoadInt32(&w.stopFlag) == 1 {
		return ErrStopped
	}

	switch state := atomic.LoadInt32(&w.state); state {
	case StateInit:
	case StateConnected:
		return nil
	default:
		return ErrNotConnected
	}

	w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

	logger, logLvl := w.getLogger()

	w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
	w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))

	_, err := w.conn.Connect()
	if err != nil {
		w.conn.Close()
		w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
		return err
	}
	atomic.StoreInt32(&w.state, StateConnected)
	w.closeChan = make(chan int)
	w.wg.Add(1)
	go w.router()

	return nil
}
複製代碼

函數開始部分很好理解,就是進行一些狀態驗證。_, err := w.conn.Connect()這裏會實際去創建和nsqd的鏈接,會在裏面跑一個readLoop和一個writeloop去進行讀寫的相關操做,東西比較多就再也不贅述。這裏關心的是這個w.router方法。

router方法裏面開了一個for循環來監聽producer的channel,包括transactionChan、responseChan、errorChan、closeChan、exitChan,若是是w.transactionChan,則把這個transaction塞進producer的transactions數組裏,而後向conn裏去寫消息即向nsqd發送數據。若是是收到了返回信號或者是錯誤信號,則會彈出一個transaction。若是收到關閉或者是退出信號,則到exit裏面,清理全部transaction,並退出。

func (w *Producer) router() {
	for {
		select {
		case t := <-w.transactionChan:
			w.transactions = append(w.transactions, t)
			err := w.conn.WriteCommand(t.cmd)
			if err != nil {
				w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
				w.close()
			}
		case data := <-w.responseChan:
			w.popTransaction(FrameTypeResponse, data)
		case data := <-w.errorChan:
			w.popTransaction(FrameTypeError, data)
		case <-w.closeChan:
			goto exit
		case <-w.exitChan:
			goto exit
		}
	}

exit:
	w.transactionCleanup()
	w.wg.Done()
	w.log(LogLevelInfo, "exiting router")
}
複製代碼

popTransaction方法,會把transactions第一個元素彈出,保存剩下的元素。而後回去調用這個彈出的ProducerTransactionfinish()方法,也就是上面說的finish()方法。上面的sendCommand方法會受到通知退出,這樣就完成了消息的發佈過程。

func (w *Producer) popTransaction(frameType int32, data []byte) {
	t := w.transactions[0]
	w.transactions = w.transactions[1:]
	if frameType == FrameTypeError {
		t.Error = ErrProtocol{string(data)}
	}
	t.finish()
}
複製代碼

總結

限於篇幅,consumer相關準備起另一篇去寫,consumer和producer在和nsqd通訊的地方會複用一些代碼。

關於nsq的producer,咱們能學習到的是它channel的使用技巧,以及進行數據傳輸的時候,是如何運載數據和如何監聽和通知channel。核心的部分在於ProducerTransaction這個結構體,它負責了消息的運載。在上面提到的readLoop和writeloop,裏面有許多代理的方法也值得關注和學習,具體內容能夠本身看一眼。

相關文章
相關標籤/搜索