nsq源碼分析

nsq的源碼比較簡單,值得一讀,特別是golang開發人員,下面重點介紹nsqd,nsqd是nsq的核心,其餘的都是輔助工具,看完這篇文章但願你能對消息隊列的原理和實現有必定的瞭解。
nsqd是一個守護進程,負責接收,排隊,投遞消息給客戶端,並不保證消息的嚴格順序,nsqd默認監聽一個tcp端口 (4150) 和一個http端口 (4151) 以及一個可選的https端口
對訂閱了同一個topic的同一個channel的消費者使用負載均衡策略,其實就是多個協程消費同一個channel
只要channel存在,即便沒有該channel的消費者,也會將生產者的message緩存到隊列(內存隊列和磁盤隊列)中,當有新的消費者產生後,就開始消費隊列中的全部消息
保證隊列中的 message 至少會被消費一次(在進程意外退出的時候這點都保證不了),並不能保證成功消費一次,即便 nsqd退出,也會將隊列中的消息暫存磁盤上(進程退出的時候會將緩存中的消息存到磁盤上,意外狀況如掉電就不行了,緩存中的消息就沒有機會存盤而丟失,在實戰中通常不會使用緩存隊列即內存buffer爲0,所有使用磁盤隊列)
限定內存佔用,可以配置nsqd中每一個channel隊列在內存中緩存的message數量,一旦channel的buffer寫滿,就將message寫到磁盤中,這點使用golang select的優先級功能,default優先級最低
topic,channel 一旦創建,將會一直存在,要及時在管理臺或者用代碼清除無效的 topic 和 channel,避免資源的浪費,每一個topic和channel都有獨立的協程處理自身的消息,默認的buffer和其餘的一些信息
nsq消息沒有備份,一旦出現進程意外狀況退出,可能會出現消息丟失,如沒有消費成功的消息,寫入文件但沒有真正落盤的消息,這種意外狀況很難杜絕,像意外退出這種狀況kafka,redis等都會遇到這樣的問題,最後都會採用一個折中的策略,定時將數據落盤
//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊漢
type Topic struct {
   // 64bit atomic vars need to be first for proper alignment on 32bit platforms
   messageCount uint64  //消息總數量
   messageBytes uint64  //消息總長度
   sync.RWMutex
   name              string   //topic name
   channelMap        map[string]*Channel //保存topic下面的全部channel
   backend           BackendQueue //磁盤隊列
   memoryMsgChan     chan *Message //內存隊列
   startChan         chan int
   exitChan          chan int
   channelUpdateChan chan int
   waitGroup         util.WaitGroupWrapper
   exitFlag          int32  //退出標記
   idFactory         *guidFactory //生成msg id的工廠
   ephemeral      bool  //是否臨時topic
   deleteCallback func(*Topic)  //刪除topic方法指針
   deleter        sync.Once
   paused    int32   //暫停標記,1暫停, 0正常
   pauseChan chan int
   ctx *context
}

 

Topic建立
nsqd用map[string]*Topic來保存全部topic,producter在發消息的時候回指定topic,nsqd在收到消息後會判斷topic是否存在,不存在就會自動建立,每建立一個新的topic就會啓動一個協程,用於處理topic相關的消息,如將內存/磁盤中的消息複製給topic中的每一個channel、channel數量變化、channel暫停、topic退出
消息結構
 
// Command represents a command from a client to an NSQ daemon
//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊漢
type Command struct {
   Name   []byte   //命令名稱,可選:IDENTIFY、FIN、RDY、REQ、PUB、MPUB、DPUB、NOP、TOUCH、SUB、CLS、AUTH
   Params [][]byte //不一樣的命令作不一樣解析,涉及到topic的,Params[0]爲topic name
   Body   []byte   //消息內容
}
 
// WriteTo implements the WriterTo interface and
// serializes the Command to the supplied Writer.
//
// It is suggested that the target Writer is buffered
// to avoid performing many system calls.
func (c *Command) WriteTo(w io.Writer) (int64, error) {
   var total int64
   var buf [4]byte
 
   n, err := w.Write(c.Name)  //命名名稱,nsqd根據這個名稱執行相關功能
   total += int64(n)
   if err != nil {
      return total, err
   }
 
   for _, param := range c.Params {
      n, err := w.Write(byteSpace)  //空格
      total += int64(n)
      if err != nil {
         return total, err
      }
      n, err = w.Write(param)  //參數
      total += int64(n)
      if err != nil {
         return total, err
      }
   }
 
   n, err = w.Write(byteNewLine)   //空行\n
   total += int64(n)
   if err != nil {
      return total, err
   }
 
   //消息內容
   if c.Body != nil {   
      bufs := buf[:]
      binary.BigEndian.PutUint32(bufs, uint32(len(c.Body)))
      n, err := w.Write(bufs)   //消息長度4字節
      total += int64(n)
      if err != nil {
         return total, err
      }
      n, err = w.Write(c.Body)  //消息內容
      total += int64(n)
      if err != nil {
         return total, err
      }
   }
 
   return total, nil
}
 
nsqd收到這個結構作解析,就能知道命令名稱(幹什麼),topic name,消息內容等,不一樣的命令,命令參數不同
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
   if bytes.Equal(params[0], []byte("IDENTIFY")) {
      return p.IDENTIFY(client, params)
   }
   err := enforceTLSPolicy(client, p, params[0])
   if err != nil {
      return nil, err
   }
   switch {
   case bytes.Equal(params[0], []byte("FIN")):
      return p.FIN(client, params)
   case bytes.Equal(params[0], []byte("RDY")):
      return p.RDY(client, params)
   case bytes.Equal(params[0], []byte("REQ")):
      return p.REQ(client, params)
   case bytes.Equal(params[0], []byte("PUB")):
      return p.PUB(client, params)
   case bytes.Equal(params[0], []byte("MPUB")):
      return p.MPUB(client, params)
   case bytes.Equal(params[0], []byte("DPUB")):
      return p.DPUB(client, params)
   case bytes.Equal(params[0], []byte("NOP")):
      return p.NOP(client, params)
   case bytes.Equal(params[0], []byte("TOUCH")):
      return p.TOUCH(client, params)
   case bytes.Equal(params[0], []byte("SUB")):
      return p.SUB(client, params)
   case bytes.Equal(params[0], []byte("CLS")):
      return p.CLS(client, params)
   case bytes.Equal(params[0], []byte("AUTH")):
      return p.AUTH(client, params)
   }
   return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
 
Topic收到消息
nsqd收到上面這個結構,解析以後,就會執行相關功能,咱們以PUB命令爲例:
1:讀到空行處,能拿到命令名稱和參數,命令名稱=PUB,命令參數爲topicName
2:檢查topicName是否有效
3:獲取消息內容長度,讀取4個字節
4:分配對應內容長度空間,讀取對應長度字節存入
5:獲取topicName信息,沒有就建立
6:構造消息結構體nsqd.Message,自動生成消息id
7:將消息提交給對應的topic,Topic.PutMessage
8:將消息寫入topic對應的內存消息通道,內存消息通道默認大小爲10000,如通道滿了則寫入磁盤
 
Topic中的消息分發給channel
在建立topic的時候回啓動一個協程處理各類消息,其中就包括消費topic中的消息,topic只是將消息投遞到其中的每一個channel中,如topic下面有10個channel,則要複製9個nsqd.Message,每一個channel一個nsqd.Message,可是消息id和消息內容是同樣的,消息內容並不會被複制,topic收到消息將消息分發給channel就完事了,消息怎麼發給消費者,由channel負責
 
type Channel struct {
   // 64bit atomic vars need to be first for proper alignment on 32bit platforms
   requeueCount uint64   //從新入隊數量
   messageCount uint64   //消息數量
   timeoutCount uint64   //超時數量,已經消費,但沒有反饋結果,會從新加入隊列,messageCount不會自增
 
   sync.RWMutex
 
   topicName string    //topic name
   name      string    //channel name
   ctx       *context
 
   backend BackendQueue  //將消息寫入磁盤的隊列,維護磁盤消息的讀寫
 
   memoryMsgChan chan *Message //內存消息隊列,通道buffer默認10000
   exitFlag      int32   //退出標記,1表示退出,0沒有退出
   exitMutex     sync.RWMutex
 
   // state tracking
   clients        map[int64]Consumer   //鏈接到這個topic-channel的全部client
   paused         int32  //暫停標記,0不暫停,1暫停,暫停就不會往這個channel中copy消息
   ephemeral      bool   //臨時channel標記,臨時channel不會存到文件中
   deleteCallback func(*Channel) //用於從topic中刪除channel
   deleter        sync.Once  
 
   // Stats tracking
   e2eProcessingLatencyStream *quantile.Quantile
 
   // TODO: these can be DRYd up
   deferredMessages map[MessageID]*pqueue.Item  //延遲消息map,方便查找
   deferredPQ       pqueue.PriorityQueue  //延遲消息隊列
   deferredMutex    sync.Mutex
   inFlightMessages map[MessageID]*Message  //消費中的消息map,方便查找
   inFlightPQ       inFlightPqueue   //消費中的消息隊列
   inFlightMutex    sync.Mutex
}

 

client訂閱topic消息
訂閱發送的仍是Command這個結構,只不過訂閱沒有消息內容而已,指定topic和channel就行,若是topic和channel不存在都會自動建立,client和server創建的是tcp長鏈接,server會啓動兩個協程,一個用於發消息,一個用於接收消息,創建鏈接後,channel會把client加入它的map[int64]Consumer中,key爲clientId,當topic收到消息後,會分發給channel,channel經過發消息的協程發給client
 
channel將消息推給消費者
channel中的消息存在兩個地方:內存通道和磁盤隊列,topic將消息分發給channel時,經過go的select將消息分發給內存通道或是磁盤隊列,因爲select的default分支優先級比case低,因此只要內存通道沒滿,就會往內存通道中寫,不然就寫入磁盤,
diskqueue.diskQueue維護着磁盤數據的讀寫,每一個非臨時的topic和channel都有這樣一個字段。
發消息的協程就會一直讀內存通道和磁盤隊列中的數據,將消息發給client
 
 
nsq消息類型有三種以下:
// frame types
const (
   FrameTypeResponse int32 = 0   //響應
   FrameTypeError    int32 = 1   //錯誤
   FrameTypeMessage  int32 = 2   //消息
)
消息發送給client以後,也不知道消息到底有沒有消費成功,有可能client收到消息以後就崩潰了,因此消息發給client以後,須要client給server發一個FIN消息告訴server,這個消息我消費成功,因此在將消息發送給client以後,消息出了內存隊列/磁盤隊列,進入了一個新的隊列,叫飛行隊列,表示這個消息正在運輸消費中,爲了維護在消費中的消息,nsq使用了兩個數據結構:
type inFlightPqueue []*Message  
inFlightPQ       inFlightPqueue //按照超時時間排序的最小堆
inFlightMessages map[MessageID]*Message //保存消息
消息發送給client以後,同時會將消息存入inFlightPQ和inFlightMessages中,inFlightPQ中的消息都設置了超時時間默認是1分鐘,若是1分鐘後尚未收到client發過來的FIN消息,會將消息從新加入待消費隊列,讓client從新消費,目的是想保證每一個消息至少被消費一次,因爲消息可保存在內存中,進程可能隨時掛掉並不能保證每一個消息都至少被消費一次,若是不用內存隊列,徹底使用磁盤隊列,當進程意外崩掉的時候,消息是否丟失要看磁盤隊列的具體實現,徹底使用磁盤隊列性能差點,安全性更高
inFlightMessages就是爲了方便經過消息id查找消息,收到client發送過來的FIN消息時就會將消息從inFlightPQ和inFlightMessages中刪除,表示這個消息已經消費成功,數據也就被扔掉了
 
延遲消息
發延遲消息和發普通消息的區別是producter在生成延遲消息的時候指定了延遲時間,單位毫秒,命令:DPUB
延遲消息存在內存中,並無存到磁盤中,延遲消息要是存在磁盤中,實現起來仍是比較複雜
延遲消息一樣使用了一個隊列和一個map,結構以下:
type Item struct {
   Value    interface{}  //*Message
   Priority int64 //執行的時間戳,單位毫秒
   Index    int   //隊列索引
}
type PriorityQueue []*Item
deferredPQ       pqueue.PriorityQueue
deferredMessages map[MessageID]*pqueue.Item
deferredPQ和inFlightPQ同樣,是按照時間排序的最小堆

 

那麼nsq是怎麼判斷消息超時,延遲消息的執行時間到了呢?
nsq有一個專門的協程來處理這兩種狀況,實現也很簡單,就是每100毫秒檢查一次,看是否有超時的消息,延遲消息是否執行時間是否到了,若是消息超時,則從新將消息加入待消費隊列,每次將消息發送給client的時候,重試次數都會加一,即Message.Attempts++
延遲消息執行時間要是到了,就會當作一個普通的消息加入待消費隊列,後面的流程都是同樣的,默認最大延遲時間爲1小時,全部的默認值在進程啓動時都是可從新指定的
 
nsqd啓動過程
1:加載啓動參數
啓動參數定義告終構nsqd.Options,並初始化好了默認值,在進程啓動的時候能夠指定對應的值,經過反射將這些參數賦給nsqd.Options,經過nsqd.Options就能方便的使用各個參數
2:加載topic和channel並啓動
在nsqd啓動的時候會加載配置文件nsqd.dat,驗證topic和channel名稱格式是否有效,而後啓動全部topic,該暫停的就暫停,當topic和channel發生變動的時候回將全部信息從新保存到nsqd.dat中,如新增/刪除/暫停/啓動topic和channel會保存文件
topic和channel保存到文件中的結構
type meta struct {
   Topics []struct {
      Name     string `json:"name"`
      Paused   bool   `json:"paused"`
      Channels []struct {
         Name   string `json:"name"`
         Paused bool   `json:"paused"`
      } `json:"channels"`
   } `json:"topics"`
}

 

3:啓動tcp/http/https服務
nsq能夠經過tcp和http經過服務,http和https提供的服務是同樣,區別在於協議自己,當client經過tcp和server創建鏈接後,server會啓動兩個協程,一個用於發消息,一個用於收消息
tcp提供的服務以下:
 
服務命令 服務描述
INENTIFY 認證
FIN 消費完成
RDY 指定可同時處理的消息數量
REQ 消息從新加入隊列
PUB 發佈單條消息
MPUB 發佈多條消息
DPUB 發佈單條延遲消息
NOP 不作任何處理
TOUCH 從新設置消息處理超時時間
SUB 訂閱,訂閱後才能消費消息
CLS 關閉中止消費
AUTH 受權
client和server創建鏈接後,client經過命令INENTIFY將認證信息發給服務端,若是server在啓動的時候指定了受權地址,server就會告訴client你須要認證,client就會經過命令AUTH將祕鑰發給server,server去受權地址進行驗證,驗證經過後,就能夠進行正常的消息發佈和訂閱了
 
http和https提供服務以下:
 
服務名稱
發佈單條/多條消息
topic新增/刪除/狀況topic中消息/暫停/啓動
channel新增/刪除/狀況topic中消息/暫停/啓動
nsq狀態信息
ping
啓動參數查詢和修改
tcp服務能發佈和消費消息,http/https則只能發佈消息,發佈消息最後調的是同一個接口
 
端口信息
 
協議名稱 默認端口
tcp 4150
http 4151
https 4152
 
心跳
心跳默認30秒,在認證(INENTIFY)的時候client能夠指定心跳時間間隔,server按照心跳給client發消息,消息內容:_heartbeat_,若是發送失敗,發送消息的協程就會退出,這樣server就不在給client發消息了,server若是從client讀消息失敗,接收消息的協程就會退出,關閉和client的鏈接,從channel中將client移除,這樣就不在收client發來的消息,server中也就沒有client的任何信息了
 
consumer和producter連着nsqd的同一個端口,爲何consumer能消費消息,而producter卻不會呢?
nsq是個基於發佈和訂閱的消息隊列,只有訂閱了才能消費消息,consumer和producter雖然連着同一個端口,consumer在創建鏈接後,會發送SUB命令,告訴server我要訂閱,而producter並無,consumer在發送SUB命令後還會發送RDY命令告訴server能同時處理消息的個數,當rdyCount=0時,server也不會給consumer推消息,因此SUB和RDY這兩個命令缺一不可
 
nsq消息文件的存取
nsq能夠將消息存在內存中或是文件中,存在內存的好處就是速度快,肯定就是一旦進程退出消息就丟失了,因此在實戰中消息都會寫到磁盤文件,雖然慢點但不容易丟消息
封裝消息存取文件的實如今 github.com/nsqio/go-diskqueue/diskqueue.go
topic收到消息後,能夠將消息存在內存中或是文件中,當內存channel寫滿以後就會寫入文件,當咱們把channel的buffer設置成0後,全部的消息就會寫文件
每一個topic都會啓動一個協程將其收到的消息複製給其下面的每一個channel,channel在將消息推送給consumer,channel收到topic發過來(函數調用)的消息,可將消息存入內存或是文件
消息寫入內存,topic下面的channel實際上是共享一份數據,由於數據都是自讀的,而寫入文件倒是每一個channel都有一組文件並將消息吸入,真正作到了讀時複製,每一個topic和channel都會實例化一個diskQueue,其結構以下
// diskQueue implements a filesystem backed FIFO queue
//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊漢
type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) readPos int64 //已經讀的位置 writePos int64 //已經寫的位置 readFileNum int64 //正在讀的文件編號 writeFileNum int64 //正在寫的文件編號 depth int64 //沒有消費的消息數量 sync.RWMutex // instantiation time metadata name string // topicName 或者 topicName + ":" + channelName dataPath string //存消息文件的目錄 maxBytesPerFile int64 // currently this cannot change once created minMsgSize int32 //消息最小值 maxMsgSize int32 //消息最大值 syncEvery int64 // number of writes per fsync syncTimeout time.Duration // duration of time per fsync exitFlag int32 //退出標記 needSync bool //強制將文件緩衝區的數據寫入磁盤 // keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64 //下次讀的位置 nextReadFileNum int64 //下次讀的文件編號 readFile *os.File //正在讀的文件 writeFile *os.File //正在寫的文件 reader *bufio.Reader //讀緩衝區,默認4K writeBuf bytes.Buffer //寫緩衝區 // exposed via ReadChan() readChan chan []byte //讀channel // internal channels writeChan chan []byte //寫channel writeResponseChan chan error //寫結果通知 emptyChan chan int //刪除全部文件channel emptyResponseChan chan error //刪除通知channel exitChan chan int //退出channel exitSyncChan chan int //退出命令同步等待channel logf AppLogFunc //寫日誌 }

 

文件名命名:目錄 + topicName:channelName + .diskqueue.000001.dat
func (d *diskQueue) fileName(fileNum int64) string {
   return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
}
diskQueue在實例化的時候回初始化相關的屬性,當文件大小大於指定文件的最大值時,文件編號writeFileNum就會自增1,新來的消息就會寫入新的文件
按順序讀寫文件,每一個消息寫文件的格式是:消息長度(4字節) + 消息內容,這樣讀消息也就很容易了,先讀4字節,知道消息的長度,接着讀消息內容,下一個消息也是這樣讀,當下一個消息讀的位置大於文件的最大值時說明這個文件讀完了,能夠從下一個文件開始寫了,
寫文件是同步的,寫完以後直接反饋消息是否寫入成功,因爲文件系統的緩存緣由,系統並非把消息立刻寫入磁盤,而是寫入了文件的緩衝區,因此須要定時的將文件緩衝區的內容寫入磁盤,nsq使用了兩個策略將文件緩衝區的內容寫入磁盤。兩個策略同時進行
1:默認每2500條消息強制將文件緩存內容寫入磁盤
2:默認每兩秒強制將文件緩存內容寫入磁盤
在將消息強制寫入磁盤的同時,也會將隊列當前狀態寫入另外一個文件,若程序退出,下次啓動後就能正常進行文件的讀寫,寫入內容包括:
1:剩餘消息數量
2:正在讀的文件編號
3:讀文件偏移量
4:正在寫的文件編號
5:寫文件偏移量
磁盤文件的刪除,若是一個文件中的消息所有被消費了,那這個文件將被刪除
 
斷開重連
斷開後若是不能自動重連,那就是死都不知道怎麼死的,因此nsq是有斷開重連功能的
server短髮現斷開後,不會自動重連,鬼知道你是否是主動斷開,因此server發現斷開了,就將client的相關信息徹底刪除,就像client從沒有出現過
client斷開後會自動重連,client分consumer和producer
consumer自動重連:consumer做爲消費者就是讀,因此當讀失敗的時候,consumer會關閉讀寫功能,就斷開鏈接,當consumer收到的全部消息處理完成後,就會自動重連,注意寫失敗並不會自動重連
producer自動重連:producer做爲生產者就是寫,因此當寫失敗的時候,producer按照狀態來決定是否重連,若是發現狀態爲非鏈接狀態就鏈接,收到斷開是不會重連的,在寫失敗的時候纔會重連
 
參考資料
 
未完。。。
相關文章
相關標籤/搜索