Nsq 是用 go 語言開發的輕量級的分佈式消息隊列,適合小型項目使用、用來學習消息隊列實現原理、學習 golang channel知識以及如何用 go 來寫分佈式,爲何說適合小型小型項目使用由於,nsq 若是沒有能力進行二次開發的狀況存在的問題仍是不少的。linux
nsqd:是一個進程監聽了 http、tcp 兩種協議,用來建立 topic、channel,分發消息給消費者,向 nsqlooup 註冊本身的元數據信息(topic、channel、consumer),本身的服務信息,最核心模塊。git
nsqlookup:存儲了 nsqd 的元數據和服務信息(endpoind),向消費者提供服務發現功能,向 nsqadmin 提供數據查詢功能。github
nsqadmin:簡單的管理界面,展現了 topic、channel以及channel上的消費者,也能夠建立 topic、channel
摘自官網
生產者向某個topic中發送消息,若是topic有一個或者多個channle,那麼該消息會被複制多分發送到每個channel中。相似 rabbitmq中的fanout類型,channle相似隊列。
官方說 nsq 是分佈式的消息隊列服務,可是在我看來只有channel到消費者這部分提現出來分佈式的感受,nsqd 這個模塊其實就是單點的,nsqd 將 topic、channel、以及消息都存儲在了本地磁盤,官方還建議一個生產者使用一個 nsqd,這樣不只浪費資源尚未數據備份的保障。一旦 nsqd 所在的主機磁損壞,數據都將丟失。golang
先部署一個簡單的環境,以 centos 操做系統爲例sql
下載 wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz 解壓 tar xvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz cd nsq-1.2.0.linux-amd64.go1.12.9/bin cp * /bin
啓動三個終端,一個用來啓動 nsqadmin(管理界面)、nsqlookup(nsqd服務以及元數據管理)、nsqd(nsq核心模塊,元數據、消息存儲以及消息分發), ip 換成本身的真實ipbootstrap
終端1 /bin/nsqd --lookupd-tcp-address 192.168.1.1:4160 -tcp-address 0.0.0.0:4152 -http-address 0.0.0.0:4153 --broadcast-address 192.168.1.1 終端2 /bin/nsqlookupd --broadcast-address 192.168.1.1:4160 終端3 /bin/nsqadmin --lookupd-http-address 192.168.1.1:4160
看一下 nsq 的簡單使用centos
cat producer.go package main import "github.com/nsqio/go-nsq" config := nsq.NewConfig() p, _ := nsq.NewProducer(addr, config) err := p.Publish("topic", []byte("message")) if err != nil { fmt.Printf("dispatch task failed %s", err) } cat consumer.go package main import "github.com/nsqio/go-nsq" type MyHandler struct {} func (h *MyHandler) HandleMessage(message *nsq.Message) error { fmt.Printf("consume message %+v\n", message) } config := nsq.NewConfig() c, _ := nsq.NewConsumer("topic", "channel", config) c.SetLoggerLevel(nsq.LogLevelDebug) handler := &MyHandler{} c.AddHandler(handler) // 這裏端口是4161 是 nsqlookup 的 http 端口, nsqd 和 nsqlookup 都同時監聽了 tcp和http兩個協議 err := c.ConnectToNSQLookupd("192.168.1.1:4161") if err != nil { fmt.Printf("Connect nsq lookup failed %+v\n", err) }
1. 生產者代碼分析api
go-nsq/producer.go併發
// After Config is passed into NewProducer the values are no longer mutable (they are copied). func NewProducer(addr string, config *Config) (*Producer, error) { err := config.Validate() if err != nil { return nil, err } p := &Producer{ id: atomic.AddInt64(&instCount, 1), addr: addr, config: *config, logger: make([]logger, int(LogLevelMax+1)), logLvl: LogLevelInfo, transactionChan: make(chan *ProducerTransaction), exitChan: make(chan int), responseChan: make(chan []byte), errorChan: make(chan []byte), } // Set default logger for all log levels l := log.New(os.Stderr, "", log.Flags()) for index, _ := range p.logger { p.logger[index] = l } return p, nil }
初始化了 Producer 的結構體app
// Publish synchronously publishes a message body to the specified topic, returning // an error if publish failed func (w *Producer) Publish(topic string, body []byte) error { return w.sendCommand(Publish(topic, body)) }
指定要往哪一個 topic 中發送消息以及要發送的消息
// Publish creates a new Command to write a message to a given topic func Publish(topic string, body []byte) *Command { var params = [][]byte{[]byte(topic)} return &Command{[]byte("PUB"), params, body} }
封裝了命令
func (w *Producer) sendCommand(cmd *Command) error { doneChan := make(chan *ProducerTransaction) // 內部使用了異步發送的方式 err := w.sendCommandAsync(cmd, doneChan, nil) if err != nil { close(doneChan) return err } // 等待異步發送完成 t := <-doneChan return t.Error }
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction, args []interface{}) error { // keep track of how many outstanding producers we're dealing with // in order to later ensure that we clean them all up... atomic.AddInt32(&w.concurrentProducers, 1) defer atomic.AddInt32(&w.concurrentProducers, -1) // 判斷有沒有和 nsqd 創建鏈接,已經創建跳過 if atomic.LoadInt32(&w.state) != StateConnected { err := w.connect() if err != nil { return err } } t := &ProducerTransaction{ cmd: cmd, doneChan: doneChan, Args: args, } select { case w.transactionChan <- t: case <-w.exitChan: return ErrStopped } return nil }
在上面這段代碼中依然沒有看到將 PUB command 發送給 nsqd進程的代碼, 咱們看一下那個 connect 函數
func (w *Producer) connect() error { w.guard.Lock() defer w.guard.Unlock() if atomic.LoadInt32(&w.stopFlag) == 1 { return ErrStopped } switch state := atomic.LoadInt32(&w.state); state { case StateInit: case StateConnected: return nil default: return ErrNotConnected } w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr) w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w}) w.conn.SetLoggerLevel(w.getLogLevel()) format := fmt.Sprintf("%3d (%%s)", w.id) for index := range w.logger { w.conn.SetLoggerForLevel(w.logger[index], LogLevel(index), format) } // 這個主要是消費者在使用。在消費者部分會詳細分析 _, err := w.conn.Connect() if err != nil { w.conn.Close() w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err) return err } atomic.StoreInt32(&w.state, StateConnected) w.closeChan = make(chan int) w.wg.Add(1) // 生產者利用這個 goroutine 向 nsqd 發送命令和接收響應 go w.router() return nil }
func (w *Producer) router() { for { select { // 在上面的 sendCommandAsync 這個方法中只看到了將待發送的命令又包裝了一下扔到了一個 channel 中,這裏在監聽,以及將命令發送給nsqd case t := <-w.transactionChan: w.transactions = append(w.transactions, t) err := w.conn.WriteCommand(t.cmd) if err != nil { w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err) w.close() } // 接收 nsqd 的響應 case data := <-w.responseChan: w.popTransaction(FrameTypeResponse, data) case data := <-w.errorChan: w.popTransaction(FrameTypeError, data) case <-w.closeChan: goto exit case <-w.exitChan: goto exit } } exit: w.transactionCleanup() w.wg.Done() w.log(LogLevelInfo, "exiting router") }
2. 消費者代碼分析
// NewConsumer creates a new instance of Consumer for the specified topic/channel // // The only valid way to create a Config is via NewConfig, using a struct literal will panic. // After Config is passed into NewConsumer the values are no longer mutable (they are copied). // 指定要監聽的訂閱的 topic 和 channel func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) { if err := config.Validate(); err != nil { return nil, err } if !IsValidTopicName(topic) { return nil, errors.New("invalid topic name") } if !IsValidChannelName(channel) { return nil, errors.New("invalid channel name") } r := &Consumer{ id: atomic.AddInt64(&instCount, 1), topic: topic, channel: channel, config: *config, logger: make([]logger, LogLevelMax+1), logLvl: LogLevelInfo, maxInFlight: int32(config.MaxInFlight), incomingMessages: make(chan *Message), rdyRetryTimers: make(map[string]*time.Timer), pendingConnections: make(map[string]*Conn), connections: make(map[string]*Conn), lookupdRecheckChan: make(chan int, 1), rng: rand.New(rand.NewSource(time.Now().UnixNano())), StopChan: make(chan int), exitChan: make(chan int), } // Set default logger for all log levels l := log.New(os.Stderr, "", log.Flags()) for index := range r.logger { r.logger[index] = l } r.wg.Add(1) // 由於nsq是推送push的方式消費消息,因此早消費者端會控制消費的速度,限流做用,能夠配置能夠自動更新 go r.rdyLoop() return r, nil }
初始化 Consumer結構體
初始化後須要添加消息處理函數 AddHandler
// AddHandler sets the Handler for messages received by this Consumer. This can be called // multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines. // // This panics if called after connecting to NSQD or NSQ Lookupd // // (see Handler or HandlerFunc for details on implementing this interface) func (r *Consumer) AddHandler(handler Handler) { r.AddConcurrentHandlers(handler, 1) } // AddConcurrentHandlers sets the Handler for messages received by this Consumer. It // takes a second argument which indicates the number of goroutines to spawn for // message handling. // // This panics if called after connecting to NSQD or NSQ Lookupd // // (see Handler or HandlerFunc for details on implementing this interface) func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) { if atomic.LoadInt32(&r.connectedFlag) == 1 { panic("already connected") } atomic.AddInt32(&r.runningHandlers, int32(concurrency)) for i := 0; i < concurrency; i++ { // 能夠設置併發 go r.handlerLoop(handler) } } func (r *Consumer) handlerLoop(handler Handler) { r.log(LogLevelDebug, "starting Handler") for { // 不斷的接收 nsqd 發送過來的請求, readloop這個死循環方法會向這個channel仍消息進來,後面咱們會說到 message, ok := <-r.incomingMessages if !ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } // 使用咱們添加的消息處理函數來消費消息 err := handler.HandleMessage(message) if err != nil { r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID) if !message.IsAutoResponseDisabled() { message.Requeue(-1) } continue } // 當一條消息處理完成是否從隊列中移除,至關於提交,默認消費完一條消息自動提交,能夠設置批量提交 if !message.IsAutoResponseDisabled() { message.Finish() } } exit: r.log(LogLevelDebug, "stopping Handler") if atomic.AddInt32(&r.runningHandlers, -1) == 0 { r.exit() } } func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool { // message passed the max number of attempts if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts { r.log(LogLevelWarning, "msg %s attempted %d times, giving up", message.ID, message.Attempts) logger, ok := handler.(FailedMessageLogger) if ok { logger.LogFailedMessage(message) } return true } return false } func (r *Consumer) exit() { r.exitHandler.Do(func() { close(r.exitChan) r.wg.Wait() close(r.StopChan) }) }
// ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance. // // If it is the first to be added, it initiates an HTTP request to discover nsqd // producers for the configured topic. // // A goroutine is spawned to handle continual polling. func (r *Consumer) ConnectToNSQLookupd(addr string) error { if atomic.LoadInt32(&r.stopFlag) == 1 { return errors.New("consumer stopped") } if atomic.LoadInt32(&r.runningHandlers) == 0 { return errors.New("no handlers") } if err := validatedLookupAddr(addr); err != nil { return err } atomic.StoreInt32(&r.connectedFlag, 1) r.mtx.Lock() for _, x := range r.lookupdHTTPAddrs { if x == addr { r.mtx.Unlock() return nil } } r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr) numLookupd := len(r.lookupdHTTPAddrs) r.mtx.Unlock() // if this is the first one, kick off the go loop if numLookupd == 1 { r.queryLookupd() r.wg.Add(1) go r.lookupdLoop() } return nil }
消費者須要鏈接到nsqlookup,從nsqlookup中查詢到nsqd的服務信息,而後進行鏈接
// make an HTTP req to one of the configured nsqlookupd instances to discover // which nsqd's provide the topic we are consuming. // // initiate a connection to any new producers that are identified. func (r *Consumer) queryLookupd() { retries := 0 retry: endpoint := r.nextLookupdEndpoint() r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint) var data lookupResp err := apiRequestNegotiateV1("GET", endpoint, nil, &data) if err != nil { r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err) retries++ if retries < 3 { r.log(LogLevelInfo, "retrying with next nsqlookupd") goto retry } return } var nsqdAddrs []string for _, producer := range data.Producers { broadcastAddress := producer.BroadcastAddress port := producer.TCPPort joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port)) nsqdAddrs = append(nsqdAddrs, joined) } // apply filter if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok { nsqdAddrs = discoveryFilter.Filter(nsqdAddrs) } // 獲取 nsqlookup中因此的nsqd信息,而後進行鏈接 for _, addr := range nsqdAddrs { err = r.ConnectToNSQD(addr) if err != nil && err != ErrAlreadyConnected { r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err) continue } } }
官方不建議消費者端直接鏈接nsqd,
// ConnectToNSQD takes a nsqd address to connect directly to. // // It is recommended to use ConnectToNSQLookupd so that topics are discovered // automatically. This method is useful when you want to connect to a single, local, // instance. func (r *Consumer) ConnectToNSQD(addr string) error { if atomic.LoadInt32(&r.stopFlag) == 1 { return errors.New("consumer stopped") } if atomic.LoadInt32(&r.runningHandlers) == 0 { return errors.New("no handlers") } atomic.StoreInt32(&r.connectedFlag, 1) // 初始化 conn := NewConn(addr, &r.config, &consumerConnDelegate{r}) conn.SetLoggerLevel(r.getLogLevel()) format := fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel) for index := range r.logger { conn.SetLoggerForLevel(r.logger[index], LogLevel(index), format) } r.mtx.Lock() _, pendingOk := r.pendingConnections[addr] _, ok := r.connections[addr] if ok || pendingOk { r.mtx.Unlock() return ErrAlreadyConnected } r.pendingConnections[addr] = conn if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 { r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr) } r.mtx.Unlock() r.log(LogLevelInfo, "(%s) connecting to nsqd", addr) cleanupConnection := func() { r.mtx.Lock() delete(r.pendingConnections, addr) r.mtx.Unlock() conn.Close() } // 進行鏈接,在分析生產者時看到過,這裏是consumer和nsqd創建了鏈接的地方 resp, err := conn.Connect() if err != nil { cleanupConnection() return err } if resp != nil { if resp.MaxRdyCount < int64(r.getMaxInFlight()) { r.log(LogLevelWarning, "(%s) max RDY count %d < consumer max in flight %d, truncation possible", conn.String(), resp.MaxRdyCount, r.getMaxInFlight()) } } // consumer向nsqd發送訂閱命令,此時consumer會將本身註冊到nsqd中,更準確的說法是consumer將本身註冊到了topic下的channel的client列表中,有消息到來時channle會隨機向本身的客戶端列表發送消息 cmd := Subscribe(r.topic, r.channel) err = conn.WriteCommand(cmd) if err != nil { cleanupConnection() return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s", conn, r.topic, r.channel, err.Error()) } r.mtx.Lock() delete(r.pendingConnections, addr) r.connections[addr] = conn r.mtx.Unlock() // pre-emptive signal to existing connections to lower their RDY count for _, c := range r.conns() { r.maybeUpdateRDY(c) } return nil
go-nsq/conn.go
// Connect dials and bootstraps the nsqd connection // (including IDENTIFY) and returns the IdentifyResponse func (c *Conn) Connect() (*IdentifyResponse, error) { dialer := &net.Dialer{ LocalAddr: c.config.LocalAddr, Timeout: c.config.DialTimeout, } // 生產者或者消費者在這裏與 nsqd 創建 tcp 鏈接 conn, err := dialer.Dial("tcp", c.addr) if err != nil { return nil, err } c.conn = conn.(*net.TCPConn) c.r = conn c.w = conn // 創建鏈接後先發送 4 字節信息表示使用哪一種協議,目前有 v1 和 v2兩種協議 _, err = c.Write(MagicV2) if err != nil { c.Close() return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err) } // 告訴 nsqd 關於本身的一些基本信息,好比心跳間隔、處理消息的超時、client id 等等 resp, err := c.identify() if err != nil { return nil, err } if resp != nil && resp.AuthRequired { if c.config.AuthSecret == "" { c.log(LogLevelError, "Auth Required") return nil, errors.New("Auth Required") } err := c.auth(c.config.AuthSecret) if err != nil { c.log(LogLevelError, "Auth Failed %s", err) return nil, err } } c.wg.Add(2) atomic.StoreInt32(&c.readLoopRunning, 1) // 這兩個 goroutine 很重要 go c.readLoop() go c.writeLoop() return resp, nil }
func (c *Conn) readLoop() { delegate := &connMessageDelegate{c} for { if atomic.LoadInt32(&c.closeFlag) == 1 { goto exit } // 從 nsqd獲取消息 frameType, data, err := ReadUnpackedResponse(c) if err != nil { if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 { goto exit } if !strings.Contains(err.Error(), "use of closed network connection") { c.log(LogLevelError, "IO error - %s", err) c.delegate.OnIOError(c, err) } goto exit } // 心跳檢測默認30s檢查一次,後面會細說一下這裏 if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) { c.log(LogLevelDebug, "heartbeat received") c.delegate.OnHeartbeat(c) err := c.WriteCommand(Nop()) if err != nil { c.log(LogLevelError, "IO error - %s", err) c.delegate.OnIOError(c, err) goto exit } continue } switch frameType { // 處理相應信息 case FrameTypeResponse: c.delegate.OnResponse(c, data) // 接收消息進行消費 case FrameTypeMessage: msg, err := DecodeMessage(data) if err != nil { c.log(LogLevelError, "IO error - %s", err) c.delegate.OnIOError(c, err) goto exit } msg.Delegate = delegate msg.NSQDAddress = c.String() atomic.AddInt64(&c.messagesInFlight, 1) atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano()) // 這裏將從nsqd那邊獲取到的消息扔到了一個channel中,這個channel就是上面 handlerloop死循環中在等待消息的channel c.delegate.OnMessage(c, msg) case FrameTypeError: c.log(LogLevelError, "protocol error - %s", data) c.delegate.OnError(c, data) default: c.log(LogLevelError, "IO error - %s", err) c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType)) } } exit: atomic.StoreInt32(&c.readLoopRunning, 0) // start the connection close messagesInFlight := atomic.LoadInt64(&c.messagesInFlight) if messagesInFlight == 0 { // if we exited readLoop with no messages in flight // we need to explicitly trigger the close because // writeLoop won't c.close() } else { c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight) } c.wg.Done() c.log(LogLevelInfo, "readLoop exiting") }
func (c *Conn) writeLoop() { for { select { case <-c.exitChan: c.log(LogLevelInfo, "breaking out of writeLoop") // Indicate drainReady because we will not pull any more off msgResponseChan close(c.drainReady) goto exit case cmd := <-c.cmdChan: err := c.WriteCommand(cmd) if err != nil { c.log(LogLevelError, "error sending command %s - %s", cmd, err) c.close() continue } case resp := <-c.msgResponseChan: // Decrement this here so it is correct even if we can't respond to nsqd msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1) if resp.success { c.log(LogLevelDebug, "FIN %s", resp.msg.ID) c.delegate.OnMessageFinished(c, resp.msg) c.delegate.OnResume(c) } else { c.log(LogLevelDebug, "REQ %s", resp.msg.ID) c.delegate.OnMessageRequeued(c, resp.msg) if resp.backoff { c.delegate.OnBackoff(c) } else { c.delegate.OnContinue(c) } } err := c.WriteCommand(resp.cmd) if err != nil { c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err) c.close() continue } if msgsInFlight == 0 && atomic.LoadInt32(&c.closeFlag) == 1 { c.close() continue } } } exit: c.wg.Done() c.log(LogLevelInfo, "writeLoop exiting") }
當消息處理完成consumer會經過writeloop向nsqd發送FIN 命令,告訴nsqd我有哪些消息消費完成能夠從隊列中移除了。
其實上面是go nsq這個客戶端的代碼,尚未看到 nsq自己的代碼,先總結一下。而後繼續看nsqd的代碼
生產者
消費者