NSQ簡介
NSQ 是實時的分佈式消息處理平臺,其設計的目的是用來大規模地處理天天數以十億計級別的消息。NSQ 具備分佈式和去中心化拓撲結構,該結構具備無單點故障、故障容錯、高可用性以及可以保證消息的可靠傳遞的特徵,是一個成熟的、已在大規模生成環境下應用的產品。golang
NSQ 由 3 個守護進程組成:
nsqd 是接收、保存和傳送消息到客戶端的守護進程。
nsqlookupd 是管理的拓撲信息,維護着全部nsqd的狀態,並提供了最終一致發現服務的守護進程。
nsqadmin 是一個 Web UI 來實時監控集羣和執行各類管理任務。 sql
這篇文章介紹主要介紹nsqd的實現。緩存
Topic與Channel
Topic與Channel是NSQ中重要的兩個概念。
生產者將消息寫到Topic中,一個Topic下能夠有多個Channel,每一個Channel都是Topic的完整副本。
消費者從Channel處訂閱消息,若是有多個消費者訂閱同一個Channel,Channel中的消息將被傳遞到一個隨機的消費者。服務器
要理解Topic Channel中各類chan的做用,關鍵是要理解golang中如何在併發環境下如何操做一個結構體(多個goroutine同時操做topic),與C/C++多線程操做同一個結構體時加鎖(mutex,rwmutex)不一樣,go語言中通常是爲這個結構體(topic,channel)開啓一個主goroutine(messagePump函數),全部對該結構體的改變的操做都應是該主goroutine完成的,也就不存在併發的問題了,其它goroutine若是想要改變這個結構體則應該向結構體提供的chan中發送消息(msgchan)或者通知(exitchan,updatechan),主goroutine會一直監聽全部的chan,當有消息或者通知到來時作相應的處理。網絡
數據的持久化
瞭解數據的持久化以前,咱們先來看兩個問題?
1. 往Topic中寫入消息就是將消息發送到Topic.memoryMsgChan中,可是memoryMsgChan是一個固定內存大小的內存隊列,若是隊列滿了怎麼辦呢?會阻塞嗎?
2. 若是消息都存放在memoryMsgChan這個內存隊列中,程序退出了消息就所有丟失了嗎?多線程
NSQ是如何解決的呢,nsq在建立Topic、Channel的時候都會建立一個DiskQueue,DiskQueue負責向磁盤文件中寫入消息、從磁盤文件中讀取消息,是NSQ實現數據持久化的最重要結構。
以Topic爲例,若是向Topic.memoryMsgChan寫入消息可是memoryMsgChan已滿時,nsq會將消息寫到topic.DiskQueue中,DiskQueue會負責將消息內存同步到磁盤上。
若是從Topic.memoryMsgChan中讀取消息時,可是memoryMsgChan並無消息時,就從topic.DiskQueue中取出同步到磁盤文件中的消息。架構
咱們看到topic.backend(diskQueue)負責將消息寫到磁盤並從磁盤中讀取消息,diskQueue提供了兩個chan供外部使用:readChan與writeChan。
咱們來看下diskQueue實現中的幾個要點。併發
- diskQueue在建立時會開啓一個goroutine,從磁盤文件中讀取消息寫到readChan中,外部goroutine能夠從readChan中獲取消息;隨時監聽writeChan,當有消息時從wirtechan中取出消息,寫到本地磁盤文件。
- diskQueue既要提供文件的讀服務又要提供文件的寫服務,因此要記錄下文件的讀位置(readIndex),寫位置(writeIndex)。每次從文件中讀取消息時使用file.Seek(readindex)定位到文件讀位置而後讀取消息信息,每次往文件中寫入消息時都要file.Seek(writeIndex)定位到寫位置再將消息寫入。
- readIndex,writeIndex很重要,程序退出時要將這些信息(meta data)寫到另外的磁盤文件(元信息文件)中,程序啓動時首先讀取元信息文件,在根據元信息文件中的readIndex writeIndex操做存儲信息的文件。
- 因爲操做系統層也有緩存,調用file.Write()寫入的信息,也可能只是存在緩存中並無同步到磁盤,須要顯示調用file.sync()才能夠強制要求操做系統把緩存同步到磁盤。能夠經過指定建立diskQueue時傳入的syncEvery,syncTimeout來控制調用file.sync()的頻率。syncTimeout是指每隔syncTimeout秒調用一次file.sync(),syncEvery是指每當寫入syncEvery個消息後調用一次file.sync()。這兩個參數均可以在啓動nsqd程序時經過命令行指定。
網絡架構
nsq是一個可靠的、高性能的服務端網絡程序,經過閱讀nsqd的源碼來學習如何搭建一個可靠的網絡服務端程序。分佈式
客戶端已成功的與服務器創建連接了,每個客戶端創建鏈接後,nsqd都會建立一個Client接口體,該結構體內保存一些client的狀態信息。
每個Client都會有兩個goroutine,一個goroutine負責讀取客戶端主動發送的各類命令,解析命令,處理命令並將處理結果回覆給客戶端。
另外一個goutine負責定時發送心跳信息給客戶端,若是客戶端訂閱某個channel的話則將channel中的將消息經過網絡發送給客戶端。函數
若是服務端不須要主動推送大量消息給客戶端,一個鏈接只須要開一個goroutine處理請求併發送回復就能夠了,這是最簡單的方式。開啓兩個goroutine操做同一個conn的話就須要注意加鎖了。
咱們來看下NSQ中幾個比較重要的命令:
- NOP 心跳回復,沒有實際意義
- PUB 發佈一個消息到 話題(topic)
PUB <topic_name>\n [ 四字節消息的大小 ][ 消息的內容 ]
- SUB 訂閱話題(topic) /通道(channel)
SUB <topic_name> <channel_name>\n
- RDY 更新 RDY 狀態 (表示客戶端已經準備好接收N 消息)
RDY <count>\n
- FIN 完成一個消息 (表示成功處理)
FIN <message_id>\n
生產者產生消息的過程比較簡單,就是一個PUB命令,先讀取四字節的消息大小,而後根據消息大小讀取消息內容,而後將內容寫到topic.MessageChan中。 咱們重點來看下消費者是如何從nsq中讀取消息的。 1. 消費者首先須要發送SUB命令,告訴nsqd它想訂閱哪一個Channel,而後nsqd將該Client與Channel創建對應關係。 2. 消費者發送RDY命令,告訴服務端它以準備好接受count個消息,服務端則向消費者發送count個消息,若是消費者想繼續接受消息就須要不斷髮送RDY命令告訴服務端本身準備好接受消息(相似TCP協議中滑動窗口的概念,消費者並非按照順序一個個的消費消息,NSQD最多能夠同時count個消息給消費者,每推送給消費者一個消息count數目減一,當消費者處理完消息回覆FIN指令時count+1)。