在上一篇文章中對nsq進行了簡單的介紹,從nsq 的golang的客戶端代碼分析了一下nsq的使用,這篇文章會分析nsqd的代碼golang
nsqd作了什麼sql
nsqadmin 是一個簡單的管理界面,經過它能夠查詢topic、channel、消費者等等一些基本信息,nsqadmin是從 nsqlookup中獲取信息的,經過nsqadmin也能夠建立topic、channel,建立到了nsqlookup中,在nsqlookup中的內存中維護者,nsqd 會在某一個合適的時刻將這些信息拉回本地而後建立
nsqd 啓動segmentfault
func (n *NSQD) Main() error { ctx := &context{n} exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { once.Do(func() { if err != nil { n.logf(LOG_FATAL, "%s", err) } exitCh <- err }) } n.tcpServer.ctx = ctx // 啓動 tcp監聽 n.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) }) // 啓動http監聽 httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) } // 隊列掃描,處理超時、延遲等信息 n.waitGroup.Wrap(n.queueScanLoop) // 向nsqlookup註冊本身的元數據信息 n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } err := <-exitCh return err }
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { logf(lg.INFO, "TCP: listening on %s", listener.Addr()) var wg sync.WaitGroup for { //等待請求的到來 clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } wg.Add(1) // 每當到來一個請求都啓動一個goroutine進行處理 go func() { handler.Handle(clientConn) wg.Done() }() } // wait to return until all handler goroutines complete wg.Wait() logf(lg.INFO, "TCP: closing %s", listener.Addr()) return nil }
unc (p *tcpServer) Handle(clientConn net.Conn) { p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us // to gracefully upgrade the protocol away from text/line oriented to whatever... buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err) clientConn.Close() return } //協商協議版本 protocolMagic := string(buf) p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } p.conns.Store(clientConn.RemoteAddr(), clientConn) // 開始一個死循環 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) } p.conns.Delete(clientConn.RemoteAddr()) }
func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) p.ctx.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize // goroutine local state derived from client attributes // and avoid a potential race with IDENTIFY (where a client // could have changed or disabled said attributes) messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) // 消息分發,向消費者發送消息 <-messagePumpStartedChan for { // 設置socket讀取超時,若是consumer未在指定的時間內發送過來,那麼會斷開鏈接,致使consumer退出 if client.HeartbeatInterval > 0 { client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2)) } else { client.SetReadDeadline(zeroTime) } // ReadSlice does not allocate new space for the data each request // ie. the returned slice is only valid until the next call to it //讀取生產者或者消費者發送過來的請求 line, err = client.Reader.ReadSlice('\n') if err != nil { if err == io.EOF { err = nil } else { err = fmt.Errorf("failed to read command - %s", err) } break } // trim the '\n' line = line[:len(line)-1] // optionally trim the '\r' if len(line) > 0 && line[len(line)-1] == '\r' { line = line[:len(line)-1] } params := bytes.Split(line, separatorBytes) p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) var response []byte // 根據不一樣的命令執行不一樣的動做 response, err = p.Exec(client, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr != nil { p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { err = p.Send(client, frameTypeResponse, response) if err != nil { err = fmt.Errorf("failed to send response - %s", err) break } } } p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client) conn.Close() close(client.ExitChan) if client.Channel != nil { client.Channel.RemoveClient(client.ID) } p.ctx.nsqd.RemoveClient(client.ID) return err }
在繼續向下看前,看一下生產者的 PUB 請求在nsqd中作了什麼api
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { var err error if len(params) < 2 { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters") } topicName := string(params[1]) if !protocol.IsValidTopicName(topicName) { return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("PUB topic name %q is not valid", topicName)) } bodyLen, err := readLen(client.Reader, client.lenSlice) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size") } if bodyLen <= 0 { return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", fmt.Sprintf("PUB invalid message body size %d", bodyLen)) } if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize)) } messageBody := make([]byte, bodyLen) _, err = io.ReadFull(client.Reader, messageBody) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") } if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil { return nil, err } // topic 在nsqd中的建立的lazy create,只有當某個生產者向該topic中發送消息時纔會建立topic, topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) } client.PublishedMessage(topicName, 1) return okBytes, nil }
/ GetTopic performs a thread safe operation // to return a pointer to a Topic object (potentially new) func (n *NSQD) GetTopic(topicName string) *Topic { // most likely, we already have this topic, so try read lock first. n.RLock() // 當topic在nsqd中建立過期就直接返回該topic t, ok := n.topicMap[topicName] n.RUnlock() if ok { return t } n.Lock() t, ok = n.topicMap[topicName] if ok { n.Unlock() return t } deleteCallback := func(t *Topic) { n.DeleteExistingTopic(t.name) } //稍後看一下這個函數 t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t n.Unlock() n.logf(LOG_INFO, "TOPIC(%s): created", t.name) // topic is created but messagePump not yet started // if loading metadata at startup, no lookupd connections yet, topic started after load if atomic.LoadInt32(&n.isLoading) == 1 { return t } // if using lookupd, make a blocking call to get the topics, and immediately create them. // this makes sure that any message received is buffered to the right channels //若是使用了nsqlookup,那麼從nsqlookup中查詢該topic的channel信息,若是沒有在nsqd中建立就建立出來 lookupdHTTPAddrs := n.lookupdHTTPAddrs() if len(lookupdHTTPAddrs) > 0 { channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) if err != nil { n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err) } for _, channelName := range channelNames { if strings.HasSuffix(channelName, "#ephemeral") { continue // do not create ephemeral channel with no consumer client } t.GetChannel(channelName) } } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 { n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name) } // now that all channels are added, start topic messagePump t.Start() return t }
// Topic constructor func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: nil, startChan: make(chan int, 1), exitChan: make(chan int), channelUpdateChan: make(chan int), ctx: ctx, paused: 0, pauseChan: make(chan int), deleteCallback: deleteCallback, idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } // create mem-queue only if size > 0 (do not use unbuffered chan) if ctx.nsqd.getOpts().MemQueueSize > 0 { t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize) } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue() } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { opts := ctx.nsqd.getOpts() lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...) } //持久化的結構 t.backend = diskqueue.New( topicName, ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, ctx.nsqd.getOpts().SyncEvery, ctx.nsqd.getOpts().SyncTimeout, dqLogf, ) } // topic中也啓動了一個messagePump,在protocolv2中也啓動了一個同名函數,前一個是爲了向consumer推送消息,這個是向topic下的一個或者多個隊列中發送消息 t.waitGroup.Wrap(t.messagePump) // 通知持久化 t.ctx.nsqd.Notify(t) return t } func (t *Topic) Start() { select { case t.startChan <- 1: default: } }
看一下nsqd是如何向nsqlookup註冊本身的元數據信息的,在nsqd啓動時起了一個goroutine lookuploopapp
func (n *NSQD) lookupLoop() { var lookupPeers []*lookupPeer var lookupAddrs []string connect := true hostname, err := os.Hostname() if err != nil { n.logf(LOG_FATAL, "failed to get hostname - %s", err) os.Exit(1) } // for announcements, lookupd determines the host automatically ticker := time.Tick(15 * time.Second) for { if connect { for _, host := range n.getOpts().NSQLookupdTCPAddresses { if in(host, lookupAddrs) { continue } n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host) lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf, connectCallback(n, hostname)) lookupPeer.Command(nil) // start the connection lookupPeers = append(lookupPeers, lookupPeer) lookupAddrs = append(lookupAddrs, host) } n.lookupPeers.Store(lookupPeers) connect = false } select { case <-ticker: // 向nsqlookup發送心跳信息 // send a heartbeat and read a response (read detects closed conns) for _, lookupPeer := range lookupPeers { n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer) cmd := nsq.Ping() _, err := lookupPeer.Command(cmd) if err != nil { n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err) } } case val := <-n.notifyChan: var cmd *nsq.Command var branch string switch val.(type) { // 註冊channel case *Channel: // notify all nsqlookupds that a new channel exists, or that it's removed branch = "channel" channel := val.(*Channel) if channel.Exiting() == true { cmd = nsq.UnRegister(channel.topicName, channel.name) } else { cmd = nsq.Register(channel.topicName, channel.name) } // 註冊topic case *Topic: // notify all nsqlookupds that a new topic exists, or that it's removed branch = "topic" topic := val.(*Topic) if topic.Exiting() == true { cmd = nsq.UnRegister(topic.name, "") } else { cmd = nsq.Register(topic.name, "") } } for _, lookupPeer := range lookupPeers { n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd) _, err := lookupPeer.Command(cmd) if err != nil { n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err) } } case <-n.optsNotificationChan: var tmpPeers []*lookupPeer var tmpAddrs []string for _, lp := range lookupPeers { if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) { tmpPeers = append(tmpPeers, lp) tmpAddrs = append(tmpAddrs, lp.addr) continue } n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp) lp.Close() } lookupPeers = tmpPeers lookupAddrs = tmpAddrs connect = true case <-n.exitChan: goto exit } } exit: n.logf(LOG_INFO, "LOOKUP: closing") }
在nsqd啓動lookuploop這個goroutine時還啓動了另外一 queueScanLoop goroutine,主要用來監控超時消息的處理。
總結一下socket
注意,consumer 消費消息是有超時配置的,消費者的每一條消息要在超時範圍內,要否則會致使一些問題。tcp