消息隊列 NSQ 源碼學習筆記 (一)

nsqlookupd 用於Topic, Channel, Node 三類信息的一致性分發node

概要

nsqlookup 知識點總結

  • 功能定位git

    • 爲node 節點和客戶端節點提供一致的topic, channel, node 查詢服務
      • Topic 主題, 和大部分消息隊列的含義一致, 消息處理時,將相同主題的數據會歸爲一類消息
      • channel,能夠理解爲 topic 的一份數據拷貝,一個或者多個消費者對接一個channel。
      • node nsqd 啓動的一個實例
      • 一個channel會放置在某一個node 節點上,一個topic 下能夠有多個channel.
    • HTTP 接口 用於客戶端服務發現以及admin 的交戶使用
    • TCP 接口 用於 node 節點作消息廣告使用
  • 實現方式github

    • 數據包括了Topic, Channel, Node 等信息,所有存儲於RegistrationDB中,RegistrationDB 採用讀寫鎖和 map 實現,數據均存儲於內存中
    • 若存在多個nsqlookup 節點,各節點之間無耦合關係

nsqlookupd 源碼閱讀

程序入口文件: /apps/nsqlookupd/main.gogolang

爲了時NSQ 在windows 良好運行,NSQ 使用了 github.com/judwhite/go-svc/svc 包,用於構建一個可實現windows 服務。 能夠用windows 的服務管理插件直接管理。web

svc 包使用時,只須要實現 github.com/judwhite/go-svc/svc.Service 的接口便可。接口以下:sql

type Service interface {
	// Init is called before the program/service is started and after it's
	// determined if the program is running as a Windows Service.
	Init(Environment) error

	// Start is called after Init. This method must be non-blocking.
	Start() error

	// Stop is called in response to os.Interrupt, os.Kill, or when a
	// Windows Service is stopped.
	Stop() error
}

所以,nsqlookup 只須要實現上述三個方法便可:編程

Init 方法

此方法僅針對windows 的服務作了處理。若爲windows 服務,則修改當前目錄爲可執行文件的目錄。json

Stop 方法

此方法作了nsqlookupd.Exit() 的處理。
此處用到了sync.Once. 即調用的退出程序僅執行一次。windows

Exit 的具體內容爲:api

func (l *NSQLookupd) Exit() {
	if l.tcpListener != nil {
		l.tcpListener.Close()
	}

	if l.httpListener != nil {
		l.httpListener.Close()
	}
	l.waitGroup.Wait()
}
  1. 關閉 TCP Listener
  2. 關閉 Http Listener
  3. 等待全部goroutine的退出 (此處用到了sync.WaitGroup,用於等待goroutine 的退出)

Start 方法

參數的初始化

NSQ 命令行參數的構造,採用了golang 自帶的flag 包。參數保存於Options對象中,採用了先初始化,後賦值的方式,減小了沒必要要的條件判斷。
能夠採用--config 的方式,直接添加配置文件。配置文件採用toml格式.
配置的解析,採用github.com/mreiferson/go-options 實現,優先級由高到低爲:

  • 命令行參數
  • deprecated 的命令行參數名稱
  • 配置文件的值 (將命令行參數,連字符替換爲下劃線做爲配置文件的key)
  • 若參數實現了Getter,則使用Get() 方法
  • 參數默認值

構造nsqlookupd

  • 初始化一個RegistrationDB
  • 創建 HttpListener 和 tcpListener (客戶端請求)
  • 啓動服務,等待鏈接請求或者中斷信號

RegistrationMap 的實現

// RegistrationDB 使用讀寫鎖作讀寫控制。
type RegistrationDB struct {
	sync.RWMutex
	registrationMap map[Registration]ProducerMap
}

type Registration struct {
	Category string   // Category 有三種類型,Topic, Channel, Client.
	Key      string
	SubKey   string
}

type ProducerMap map[string]*Producer

type Producer struct {
	peerInfo     *PeerInfo //客戶端的相關信息
	tombstoned   bool
	tombstonedAt time.Time
}

type PeerInfo struct {
	lastUpdate       int64   // 上次更新的時間
	id               string  // 使用ip標識的id
	RemoteAddress    string `json:"remote_address"`
	Hostname         string `json:"hostname"`
	BroadcastAddress string `json:"broadcast_address"`
	TCPPort          int    `json:"tcp_port"`
	HTTPPort         int    `json:"http_port"`
	Version          string `json:"version"`
}

接口閱讀

TcpListener

tcp 消息是 nsqd 與nsqlookupd 溝通的協議。 node 保存的是nsqd 的信息

Tcp Listener 是用來監聽客戶端發來的TCP 消息。
創建鏈接後,發送4個byte標識鏈接的版本號。目前是v1. "__V1" (下劃線用空格替代)
消息之間按照換行符\n分割。

目前客戶端支持4類消息:

  • PING
    • 返回OK
    • 若存在對端的信息,則更新client.peerInfo.lastUpdate <上次更新時間>
  • IDENTIFY
    • 用於消息的認證,將nsqd信息發送給nsqlookupd.
    • 消息格式 IDENTIFY\nBODYLEN(32bit)BODY
      |8bit    |1 bit | 32bit     | N bit |
      |IDENTIFY| 換行 | body 長度  | body  |
    • BODY 爲json格式
    • 包含了以下字段:
      • 廣播地址
      • TCP 端口
      • HTTP 端口
      • 版本號
      • 服務器地址 (經過鏈接直接獲取)
  • REGISTER
    • 將nsqd 中註冊的topic 和channel 信息發送到nsqlookupd 上,作信息共享
  • UNREGISTER
    • 將nsqd 中註銷的topic 和channel 信息發送到nsqlookupd 上,作信息共享

HTTPListener

** http 客戶端的定位是用於服務的發現和admin的交互 **

  1. 在學習 http 請求時,能夠先學習下 nsq/internal/http_api 包,此包是對golang 中http請求handler 的一次封裝:
type Decorator func(APIHandler) APIHandler
type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)

// f 是業務處理邏輯, ds 能夠自定義多個包裝器,用於對f 的輸入和輸出數據作處理。
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
	decorated := f
	for _, decorate := range ds {
		decorated = decorate(decorated)
	}
	return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
		decorated(w, req, ps)
	}
}

// Decorator 的一個例子,作日誌記錄的處理
func Log(logf lg.AppLogFunc) Decorator {
	return func(f APIHandler) APIHandler {
		return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
			start := time.Now()
			response, err := f(w, req, ps)
			elapsed := time.Since(start)
			status := 200
			if e, ok := err.(Err); ok {
				status = e.Code
			}
			logf(lg.INFO, "%d %s %s (%s) %s",
				status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)
			return response, err
		}
	}
}

這種處理方式相似於大部分web框架HTTP 中間件的處理方式,是利用遞歸嵌套的方式,保留了處理的上下文, 實現服務切片編程。

  1. http 服務,使用github.com/julienschmidt/httprouter包實現http 的路由功能。

  2. 目前HTTP 客戶端支持如下的請求:

Method Router Param Response
GET /ping - "OK"
GET /info - 返回版本信息
GET /debug - 返回 db 中全部信息
GET /lookup topic 返回topic 關聯的全部的channels 和 nsqd 服務的信息
GET /topics - 返回全部topic 的值
GET /channels topic 返回topic 下全部的channels 信息
GET /nodes - 返回全部在線的nsqd 的node 信息, node 節點中包含了 topic 的信息,以及是否須要被刪除
POST /topic/create topic 建立topic <不超過64個字符長度>
POST /topic/delete topic 刪除相應topic 的channel 和topic 信息
POST /channel/create topic, channel 建立 channel , 若topic 不存在,建立topic
POST /channel/delete topic, channel 刪除 channel, 支持 *
POST /topic/tombstone topic, node 將topic 下某個node 設置刪除標識 tombstone, 給node 節點 一段空餘時間用於刪除相關topic 信息,併發送刪除topic的命令
GET /debug/pprof - pprof 提供的信息
GET /debug/pprof/cmdline - pprof 提供的信息
GET /debug/pprof/symbol - pprof 提供的信息
POST /debug/pprof/symbol - pprof 提供的信息
GET /debug/pprof/profile - pprof 提供的信息
GET /debug/pprof/heap - pprof 提供的信息
GET /debug/pprof/goroutine - pprof 提供的信息
GET /debug/pprof/block - pprof 提供的信息
GET /debug/pprof/threadcreate - pprof 提供的信息

學習總結

相關文章
相關標籤/搜索