![]() |
![]() |
很重要--轉載聲明
- 本站文章無特別說明,皆爲原創,版權全部,轉載時請用連接的方式,給出原文出處。同時寫上原做者:朝十晚八 or Twowords
- 如要轉載,請原文轉載,如在轉載時修改本文,請事先告知,謝絕在轉載時經過修改本文達到有利於轉載者的目的。
回顧:web
之前一直是C++開發(客戶端),最近聽同事講go語言不錯,隨後便決定先從go語法開始投向go的懷抱。因爲歷史緣由學習go語法時,用了半天的時間看完了菜鳥教程上相關資料,後來又看了易百教程上的一些實例代碼,感受都比較簡單,畢竟仍是有C++基礎存在的。。。可是找工做大多都是須要工做經驗的,那麼怎麼辦纔好呢!後來在知乎上看到有一位大神推薦看NSQ和skynet開源框架,權衡之下我決定從NSQ開始學習,進入個人go學習之路。sql
要學習NSQ,首先就是上www查找相關NSQ的資料,沒想到百度一下相關資料仍是挺多,有好幾個網友的博客都寫的不錯,思路比較清晰,可是基本都沒有徹底的把NSQ分析下來,只能後邊繼續等待了。文章末尾我會把本身覺着能夠幫助理解的文章連接貼上。windows
本篇文章我不打算直接開始分析NSQ框架代碼,而是想從一些零散的地方着手,針對性的講NSQ,這樣有利於後期咱們逐模塊分析NSQ源碼。api
下邊就開始咱們文本的中心內容,文章結構是按點劃分,會比較零散但都是我的在看NSQ源碼時的心得,也能夠說是我的以爲比較難理解的地方吧,看NSQ以前,只學習了go語法2天時間,所以文中可能會涉及到一些基礎性錯誤,歡迎你們指正。瀏覽器
我我的電腦是win10 64位,所以在這兒我就給出一個網友寫好的Windows下NSQ部署文章NSQ如何在windows上安裝 ,安裝網友的文章說明個人測試結果圖1所示,當啓動nsqd鏈接nsqlookupd時,會有相應的提示,啓動nsq_to_file進程時,會往nsqd寫入消息都有相應提示,如圖1中用紅色矩形框選中的是對於的關鍵提示信息。服務器
圖1 NSQ部署測試app
打開瀏覽器直接輸入http://127.0.0.1:4171/
就能夠查看NSQ運行狀況框架
經過第一小節,咱們簡單的把NSQ部署起來,並看到了NSQ的運行狀況,還記得咱們啓動各個進程的步驟嗎,不記得不要緊,看圖2所示,該圖是出自nsq源碼分析之概述,我的覺着這幅圖對NSQ總結的很是好,從圖中咱們能夠了解到下面幾個點tcp
圖2 NSQ拓撲圖函數
瞭解了nsq的總體結構後,咱們就能夠開始按模塊分析nsq的源碼
3、NSQ啓動與退出
nsq優雅的啓動與退出使用了SVC包,推薦閱讀Nsq源碼閱讀(1) 啓動和優雅退出,這篇文章講解的很是詳細。
NSQ中簡單包裝了sync.WaitGroup,包裝後的待執行函數都會在輕量級的線程中執行,代碼以下所示。主線程中啓動了多個子線程後,只有等啓動的多個子線程結束後主線程才能結束,方法Wrap中的Add和Done調用分別會維護一個引用計數,只有當該引用計數爲0時,主線程纔會結束等待
1 //若是結構體S,包含一個匿名字段T,那麼這個結構體S 就有了T的方法。 2 type WaitGroupWrapper struct { 3 sync.WaitGroup 4 } 5 6 func (w *WaitGroupWrapper) Wrap(cb func()) { 7 w.Add(1) //sync.WaitGroup結構中方法 8 go func() { 9 cb() 10 w.Done() //sync.WaitGroup結構中方法 11 }() 12 }
Go語言式的接口,就是不用顯示聲明類型T實現了接口I,只要類型T的公開方法徹底知足接口I的要求,就能夠把類型T的對象用在須要接口I的地方。好比nsqlookupd.go文件中Main函數最後啓動http監聽服務時代碼
1 l.Lock() 2 l.httpListener = httpListener //把Listener存在NSQLookupd的struct裏 3 l.Unlock() 4 //建立httpServer的實例,httpServer在nsqlookupd\http.go文件中定義 5 httpServer := newHTTPServer(ctx) 6 //調用http_api.Serve方法(在http_api\http_server.go中定義)開始在指定的httpListener上接收http鏈接。 7 l.waitGroup.Wrap(func() { 8 //由於httpServer結構重寫了http.Handler接口類的ServeHTTP方法,所以能夠當http.Handler使用 9 http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger) 10 })
代碼第9行調用中的第二個參數httpServer,是一個自定義的struct,而http_apit.Serve須要的參數類型爲http.Handler,由於httpServer實現了http.Handler接口類中的接口,所以能夠在這個地方使用,對每個類型實現接口方法以下
1 func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { 2 s.router.ServeHTTP(w, req) 3 }
NSQD做爲消息接收、轉發者,他也是nsqlookupd的客戶端,當nsqd啓動時,除過本身啓動tcp和http服務等待生產者和消費者鏈接外,還須要做爲client鏈接到nsqlookupd。在nsqd.go文件的Main函數最後,經過waitGroup啓動了3個線程,以下代碼所示,第2行代碼啓動了一個lookupLoop循環,該循環是一個死循環,其中有一項功能就是發送心跳值,告訴全部的nsqlookupd,本身還活着。
1 n.waitGroup.Wrap(func() { n.queueScanLoop() }) //循環處理消息的分發 2 n.waitGroup.Wrap(func() { n.lookupLoop() }) //同步nsqd狀態到nsqlookup好比:在線、Topic變化、Channel變化等 3 if n.getOpts().StatsdAddress != "" { 4 n.waitGroup.Wrap(func() { n.statsdLoop() }) 5 }
心跳發送代碼在lookup.go文件中,處理方式以下
1 case <-ticker: //發送心跳 告訴nsqlookup本身在線 2 // send a heartbeat and read a response (read detects closed conns) 3 for _, lookupPeer := range lookupPeers { 4 n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer) 5 cmd := nsq.Ping() 6 _, err := lookupPeer.Command(cmd) 7 if err != nil { 8 n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err) 9 } 10 }
nsqlookupd做爲服務器端,啓動時就開啓了tcp監聽,每接受一個nsqd鏈接,就使用go開啓一個handle處理函數,最終和客戶端(nsqd)交互功能代碼在LookupProtocolV1文件中完成。
此時咱們在看圖2的NSQ拓撲圖,說過了nsqd做爲client連接nsqlookupd服務器端後,咱們在順道說下nsqd開啓tcp監控做爲服務器時有哪些是客戶端,此時的client包括:生產者和消費者,即圖中的writer和reader。
先看下NSQ源碼中對Decorator定義,其實就是一個函數的嵌套定義,源碼位置在internal/http_api/api_response.go文件中,代碼以下:
1 type Decorator func(APIHandler) APIHandler 2 3 type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)
下邊咱們來看兩個關於Decorate的使用
1 func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle { 2 decorated := f 3 for _, decorate := range ds { 4 decorated = decorate(decorated) 5 } 6 return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { 7 decorated(w, req, ps) 8 } 9 } 10 11 func Log(l app.Logger) Decorator { //Logger是go對應的log4版本 12 return func(f APIHandler) APIHandler { 13 return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { 14 start := time.Now() //當前時間 15 response, err := f(w, req, ps) //執行http請求 err錯誤狀態 response處理結果 16 elapsed := time.Since(start) //f(APIHandler)調用時長 17 status := 200 18 if e, ok := err.(Err); ok { 19 status = e.Code //重置錯誤碼 20 } 21 l.Output(2, fmt.Sprintf("%d %s %s (%s) %s", 22 status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)) //輸出http處理日誌信息 23 return response, err //返回一個接口 和錯誤狀態 24 } 25 } 26 }
首先先來看下代碼第一行定義的Decorate函數,這個函數其實就是一個裝飾函數,第一個參數爲須要被裝飾的視圖函數,從第二參數開始,都是裝飾函數,最後返回裝飾好的視圖函數。
第11行代碼定義了一個Log函數,返回值爲Decorator類型,也就是代碼第12行return後邊的表達式,該表達式也是一個函數定義,其返回值爲APIHandler,第13行代碼return後邊的表達式爲其返回值。