//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊漢
type Topic struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms messageCount uint64 //消息總數量 messageBytes uint64 //消息總長度 sync.RWMutex name string //topic name channelMap map[string]*Channel //保存topic下面的全部channel backend BackendQueue //磁盤隊列 memoryMsgChan chan *Message //內存隊列 startChan chan int exitChan chan int channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 //退出標記 idFactory *guidFactory //生成msg id的工廠 ephemeral bool //是否臨時topic deleteCallback func(*Topic) //刪除topic方法指針 deleter sync.Once paused int32 //暫停標記,1暫停, 0正常 pauseChan chan int ctx *context }
// Command represents a command from a client to an NSQ daemon
//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊漢
type Command struct { Name []byte //命令名稱,可選:IDENTIFY、FIN、RDY、REQ、PUB、MPUB、DPUB、NOP、TOUCH、SUB、CLS、AUTH Params [][]byte //不一樣的命令作不一樣解析,涉及到topic的,Params[0]爲topic name Body []byte //消息內容 } // WriteTo implements the WriterTo interface and // serializes the Command to the supplied Writer. // // It is suggested that the target Writer is buffered // to avoid performing many system calls. func (c *Command) WriteTo(w io.Writer) (int64, error) { var total int64 var buf [4]byte n, err := w.Write(c.Name) //命名名稱,nsqd根據這個名稱執行相關功能 total += int64(n) if err != nil { return total, err } for _, param := range c.Params { n, err := w.Write(byteSpace) //空格 total += int64(n) if err != nil { return total, err } n, err = w.Write(param) //參數 total += int64(n) if err != nil { return total, err } } n, err = w.Write(byteNewLine) //空行\n total += int64(n) if err != nil { return total, err } //消息內容 if c.Body != nil { bufs := buf[:] binary.BigEndian.PutUint32(bufs, uint32(len(c.Body))) n, err := w.Write(bufs) //消息長度4字節 total += int64(n) if err != nil { return total, err } n, err = w.Write(c.Body) //消息內容 total += int64(n) if err != nil { return total, err } } return total, nil }
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { if bytes.Equal(params[0], []byte("IDENTIFY")) { return p.IDENTIFY(client, params) } err := enforceTLSPolicy(client, p, params[0]) if err != nil { return nil, err } switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) case bytes.Equal(params[0], []byte("RDY")): return p.RDY(client, params) case bytes.Equal(params[0], []byte("REQ")): return p.REQ(client, params) case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("MPUB")): return p.MPUB(client, params) case bytes.Equal(params[0], []byte("DPUB")): return p.DPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("TOUCH")): return p.TOUCH(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) case bytes.Equal(params[0], []byte("CLS")): return p.CLS(client, params) case bytes.Equal(params[0], []byte("AUTH")): return p.AUTH(client, params) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
type Channel struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms requeueCount uint64 //從新入隊數量 messageCount uint64 //消息數量 timeoutCount uint64 //超時數量,已經消費,但沒有反饋結果,會從新加入隊列,messageCount不會自增 sync.RWMutex topicName string //topic name name string //channel name ctx *context backend BackendQueue //將消息寫入磁盤的隊列,維護磁盤消息的讀寫 memoryMsgChan chan *Message //內存消息隊列,通道buffer默認10000 exitFlag int32 //退出標記,1表示退出,0沒有退出 exitMutex sync.RWMutex // state tracking clients map[int64]Consumer //鏈接到這個topic-channel的全部client paused int32 //暫停標記,0不暫停,1暫停,暫停就不會往這個channel中copy消息 ephemeral bool //臨時channel標記,臨時channel不會存到文件中 deleteCallback func(*Channel) //用於從topic中刪除channel deleter sync.Once // Stats tracking e2eProcessingLatencyStream *quantile.Quantile // TODO: these can be DRYd up deferredMessages map[MessageID]*pqueue.Item //延遲消息map,方便查找 deferredPQ pqueue.PriorityQueue //延遲消息隊列 deferredMutex sync.Mutex inFlightMessages map[MessageID]*Message //消費中的消息map,方便查找 inFlightPQ inFlightPqueue //消費中的消息隊列 inFlightMutex sync.Mutex }
// frame types const ( FrameTypeResponse int32 = 0 //響應 FrameTypeError int32 = 1 //錯誤 FrameTypeMessage int32 = 2 //消息 )
type inFlightPqueue []*Message inFlightPQ inFlightPqueue //按照超時時間排序的最小堆 inFlightMessages map[MessageID]*Message //保存消息
type Item struct { Value interface{} //*Message Priority int64 //執行的時間戳,單位毫秒 Index int //隊列索引 } type PriorityQueue []*Item deferredPQ pqueue.PriorityQueue deferredMessages map[MessageID]*pqueue.Item deferredPQ和inFlightPQ同樣,是按照時間排序的最小堆
type meta struct { Topics []struct { Name string `json:"name"` Paused bool `json:"paused"` Channels []struct { Name string `json:"name"` Paused bool `json:"paused"` } `json:"channels"` } `json:"topics"` }
服務命令 | 服務描述 |
INENTIFY | 認證 |
FIN | 消費完成 |
RDY | 指定可同時處理的消息數量 |
REQ | 消息從新加入隊列 |
PUB | 發佈單條消息 |
MPUB | 發佈多條消息 |
DPUB | 發佈單條延遲消息 |
NOP | 不作任何處理 |
TOUCH | 從新設置消息處理超時時間 |
SUB | 訂閱,訂閱後才能消費消息 |
CLS | 關閉中止消費 |
AUTH | 受權 |
服務名稱 |
發佈單條/多條消息 |
topic新增/刪除/狀況topic中消息/暫停/啓動 |
channel新增/刪除/狀況topic中消息/暫停/啓動 |
nsq狀態信息 |
ping |
啓動參數查詢和修改 |
協議名稱 | 默認端口 |
tcp | 4150 |
http | 4151 |
https | 4152 |
// diskQueue implements a filesystem backed FIFO queue
//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊漢
type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) readPos int64 //已經讀的位置 writePos int64 //已經寫的位置 readFileNum int64 //正在讀的文件編號 writeFileNum int64 //正在寫的文件編號 depth int64 //沒有消費的消息數量 sync.RWMutex // instantiation time metadata name string // topicName 或者 topicName + ":" + channelName dataPath string //存消息文件的目錄 maxBytesPerFile int64 // currently this cannot change once created minMsgSize int32 //消息最小值 maxMsgSize int32 //消息最大值 syncEvery int64 // number of writes per fsync syncTimeout time.Duration // duration of time per fsync exitFlag int32 //退出標記 needSync bool //強制將文件緩衝區的數據寫入磁盤 // keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64 //下次讀的位置 nextReadFileNum int64 //下次讀的文件編號 readFile *os.File //正在讀的文件 writeFile *os.File //正在寫的文件 reader *bufio.Reader //讀緩衝區,默認4K writeBuf bytes.Buffer //寫緩衝區 // exposed via ReadChan() readChan chan []byte //讀channel // internal channels writeChan chan []byte //寫channel writeResponseChan chan error //寫結果通知 emptyChan chan int //刪除全部文件channel emptyResponseChan chan error //刪除通知channel exitChan chan int //退出channel exitSyncChan chan int //退出命令同步等待channel logf AppLogFunc //寫日誌 }
func (d *diskQueue) fileName(fileNum int64) string { return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum) }