nsqlookupd 用於Topic, Channel, Node 三類信息的一致性分發node
程序入口文件: /apps/nsqlookupd/main.go
爲了時NSQ 在windows 良好運行,NSQ 使用了 github.com/judwhite/go-svc/svc
包,用於構建一個可實現windows 服務。 能夠用windows 的服務管理插件直接管理。web
svc 包使用時,只須要實現 github.com/judwhite/go-svc/svc.Service
type Service interface { // Init is called before the program/service is started and after it's // determined if the program is running as a Windows Service. Init(Environment) error // Start is called after Init. This method must be non-blocking. Start() error // Stop is called in response to os.Interrupt, os.Kill, or when a // Windows Service is stopped. Stop() error }
所以,nsqlookup 只須要實現上述三個方法便可:編程
此方法僅針對windows 的服務作了處理。若爲windows 服務,則修改當前目錄爲可執行文件的目錄。json
此方法作了nsqlookupd.Exit() 的處理。
此處用到了sync.Once. 即調用的退出程序僅執行一次。windows
func (l *NSQLookupd) Exit() { if l.tcpListener != nil { l.tcpListener.Close() } if l.httpListener != nil { l.httpListener.Close() } l.waitGroup.Wait() }
NSQ 命令行參數的構造,採用了golang 自帶的flag 包。參數保存於Options對象中,採用了先初始化,後賦值的方式,減小了沒必要要的條件判斷。
能夠採用--config 的方式,直接添加配置文件。配置文件採用toml格式.
// RegistrationDB 使用讀寫鎖作讀寫控制。 type RegistrationDB struct { sync.RWMutex registrationMap map[Registration]ProducerMap } type Registration struct { Category string // Category 有三種類型,Topic, Channel, Client. Key string SubKey string } type ProducerMap map[string]*Producer type Producer struct { peerInfo *PeerInfo //客戶端的相關信息 tombstoned bool tombstonedAt time.Time } type PeerInfo struct { lastUpdate int64 // 上次更新的時間 id string // 使用ip標識的id RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` }
tcp 消息是 nsqd 與nsqlookupd 溝通的協議。 node 保存的是nsqd 的信息
Tcp Listener 是用來監聽客戶端發來的TCP 消息。
創建鏈接後,發送4個byte標識鏈接的版本號。目前是v1. "__V1" (下劃線用空格替代)
|8bit |1 bit | 32bit | N bit | |IDENTIFY| 換行 | body 長度 | body |
** http 客戶端的定位是用於服務的發現和admin的交互 **
包,此包是對golang 中http請求handler 的一次封裝:type Decorator func(APIHandler) APIHandler type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error) // f 是業務處理邏輯, ds 能夠自定義多個包裝器,用於對f 的輸入和輸出數據作處理。 func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle { decorated := f for _, decorate := range ds { decorated = decorate(decorated) } return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { decorated(w, req, ps) } } // Decorator 的一個例子,作日誌記錄的處理 func Log(logf lg.AppLogFunc) Decorator { return func(f APIHandler) APIHandler { return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { start := time.Now() response, err := f(w, req, ps) elapsed := time.Since(start) status := 200 if e, ok := err.(Err); ok { status = e.Code } logf(lg.INFO, "%d %s %s (%s) %s", status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed) return response, err } } }
這種處理方式相似於大部分web框架HTTP 中間件的處理方式,是利用遞歸嵌套的方式,保留了處理的上下文, 實現服務切片編程。
http 服務,使用github.com/julienschmidt/httprouter
包實現http 的路由功能。
目前HTTP 客戶端支持如下的請求:
Method | Router | Param | Response |
GET | /ping | - | "OK" |
GET | /info | - | 返回版本信息 |
GET | /debug | - | 返回 db 中全部信息 |
GET | /lookup | topic | 返回topic 關聯的全部的channels 和 nsqd 服務的信息 |
GET | /topics | - | 返回全部topic 的值 |
GET | /channels | topic | 返回topic 下全部的channels 信息 |
GET | /nodes | - | 返回全部在線的nsqd 的node 信息, node 節點中包含了 topic 的信息,以及是否須要被刪除 |
POST | /topic/create | topic | 建立topic <不超過64個字符長度> |
POST | /topic/delete | topic | 刪除相應topic 的channel 和topic 信息 |
POST | /channel/create | topic, channel | 建立 channel , 若topic 不存在,建立topic |
POST | /channel/delete | topic, channel | 刪除 channel, 支持 * |
POST | /topic/tombstone | topic, node | 將topic 下某個node 設置刪除標識 tombstone, 給node 節點 一段空餘時間用於刪除相關topic 信息,併發送刪除topic的命令 |
GET | /debug/pprof | - | pprof 提供的信息 |
GET | /debug/pprof/cmdline | - | pprof 提供的信息 |
GET | /debug/pprof/symbol | - | pprof 提供的信息 |
POST | /debug/pprof/symbol | - | pprof 提供的信息 |
GET | /debug/pprof/profile | - | pprof 提供的信息 |
GET | /debug/pprof/heap | - | pprof 提供的信息 |
GET | /debug/pprof/goroutine | - | pprof 提供的信息 |
GET | /debug/pprof/block | - | pprof 提供的信息 |
GET | /debug/pprof/threadcreate | - | pprof 提供的信息 |