NSQ之粗讀淺談

回顧:web

  之前一直是C++開發(客戶端),最近聽同事講go語言不錯,隨後便決定先從go語法開始投向go的懷抱。因爲歷史緣由學習go語法時,用了半天的時間看完了菜鳥教程上相關資料,後來又看了易百教程上的一些實例代碼,感受都比較簡單,畢竟仍是有C++基礎存在的。。。可是找工做大多都是須要工做經驗的,那麼怎麼辦纔好呢!後來在知乎上看到有一位大神推薦看NSQ和skynet開源框架,權衡之下我決定從NSQ開始學習,進入個人go學習之路。sql

  要學習NSQ,首先就是上www查找相關NSQ的資料,沒想到百度一下相關資料仍是挺多,有好幾個網友的博客都寫的不錯,思路比較清晰,可是基本都沒有徹底的把NSQ分析下來,只能後邊繼續等待了。文章末尾我會把本身覺着能夠幫助理解的文章連接貼上。windows

  本篇文章我不打算直接開始分析NSQ框架代碼,而是想從一些零散的地方着手,針對性的講NSQ,這樣有利於後期咱們逐模塊分析NSQ源碼。api

  下邊就開始咱們文本的中心內容,文章結構是按點劃分,會比較零散但都是我的在看NSQ源碼時的心得,也能夠說是我的以爲比較難理解的地方吧,看NSQ以前,只學習了go語法2天時間,所以文中可能會涉及到一些基礎性錯誤,歡迎你們指正瀏覽器

1、NSQ部署

  我我的電腦是win10 64位,所以在這兒我就給出一個網友寫好的Windows下NSQ部署文章NSQ如何在windows上安裝 ,安裝網友的文章說明個人測試結果圖1所示,當啓動nsqd鏈接nsqlookupd時,會有相應的提示,啓動nsq_to_file進程時,會往nsqd寫入消息都有相應提示,如圖1中用紅色矩形框選中的是對於的關鍵提示信息。服務器

圖1 NSQ部署測試app

  打開瀏覽器直接輸入http://127.0.0.1:4171/就能夠查看NSQ運行狀況框架

2、NSQ拓撲圖

  經過第一小節,咱們簡單的把NSQ部署起來,並看到了NSQ的運行狀況,還記得咱們啓動各個進程的步驟嗎,不記得不要緊,看圖2所示,該圖是出自nsq源碼分析之概述,我的覺着這幅圖對NSQ總結的很是好,從圖中咱們能夠了解到下面幾個點tcp

  • nsqlookupd進程同時開啓tcp和http兩個監聽服務,TCP監聽是用於NSQD進程鏈接,http監聽是用於提供給nsqadmin獲取集羣信息
  • nsqadmin進程只開啓http服務,其實就是一個web服務,提供給客戶端查詢集羣信息
  • nsqd進程同時開啓tcp和http服務,TPC監聽和http監聽都提供給生產者和消費者鏈接,http服務還提供給nsqadmin獲取該nsqd本地信息
  • nsqd鏈接到nsqlookupd的tcp監聽上,經過心跳告訴nsqlookupd本身在線
  • writer是生產者,直接鏈接nsqd
  • reader是消費者,直接鏈接nsqd

圖2 NSQ拓撲圖函數

  瞭解了nsq的總體結構後,咱們就能夠開始按模塊分析nsq的源碼

 3、NSQ啓動與退出

  nsq優雅的啓動與退出使用了SVC包,推薦閱讀Nsq源碼閱讀(1) 啓動和優雅退出,這篇文章講解的很是詳細。

4、全局惟一messageid

  Nsq源碼閱讀(6) 全局惟一messageid

5、NSQ同步

  NSQ中簡單包裝了sync.WaitGroup,包裝後的待執行函數都會在輕量級的線程中執行,代碼以下所示。主線程中啓動了多個子線程後,只有等啓動的多個子線程結束後主線程才能結束,方法Wrap中的Add和Done調用分別會維護一個引用計數,只有當該引用計數爲0時,主線程纔會結束等待

 1 //若是結構體S,包含一個匿名字段T,那麼這個結構體S 就有了T的方法。
 2 type WaitGroupWrapper struct {
 3     sync.WaitGroup
 4 }
 5 
 6 func (w *WaitGroupWrapper) Wrap(cb func()) {
 7     w.Add(1) //sync.WaitGroup結構中方法
 8     go func() {
 9         cb()
10         w.Done() //sync.WaitGroup結構中方法
11     }()
12 }

6、Go接口

  Go語言式的接口,就是不用顯示聲明類型T實現了接口I,只要類型T的公開方法徹底知足接口I的要求,就能夠把類型T的對象用在須要接口I的地方。好比nsqlookupd.go文件中Main函數最後啓動http監聽服務時代碼

 1 l.Lock()                                                                                     
 2 l.httpListener = httpListener //把Listener存在NSQLookupd的struct裏                                
 3 l.Unlock()                                                                                   
 4 //建立httpServer的實例,httpServer在nsqlookupd\http.go文件中定義                                         
 5 httpServer := newHTTPServer(ctx)                                                             
 6 //調用http_api.Serve方法(在http_api\http_server.go中定義)開始在指定的httpListener上接收http鏈接。                
 7 l.waitGroup.Wrap(func() {                                                                    
 8     //由於httpServer結構重寫了http.Handler接口類的ServeHTTP方法,所以能夠當http.Handler使用                       
 9     http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger)                          
10 })                                                                                           

  代碼第9行調用中的第二個參數httpServer,是一個自定義的struct,而http_apit.Serve須要的參數類型爲http.Handler,由於httpServer實現了http.Handler接口類中的接口,所以能夠在這個地方使用,對每個類型實現接口方法以下

1 func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
2     s.router.ServeHTTP(w, req)
3 }

7、NSQD心跳

  NSQD做爲消息接收、轉發者,他也是nsqlookupd的客戶端,當nsqd啓動時,除過本身啓動tcp和http服務等待生產者和消費者鏈接外,還須要做爲client鏈接到nsqlookupd。在nsqd.go文件的Main函數最後,經過waitGroup啓動了3個線程,以下代碼所示,第2行代碼啓動了一個lookupLoop循環,該循環是一個死循環,其中有一項功能就是發送心跳值,告訴全部的nsqlookupd,本身還活着。

1 n.waitGroup.Wrap(func() { n.queueScanLoop() }) //循環處理消息的分發                                             
2 n.waitGroup.Wrap(func() { n.lookupLoop() })    //同步nsqd狀態到nsqlookup好比:在線、Topic變化、Channel變化等            
3 if n.getOpts().StatsdAddress != "" {                                                                   
4     n.waitGroup.Wrap(func() { n.statsdLoop() })          
5 }                                              

  心跳發送代碼在lookup.go文件中,處理方式以下

 1 case <-ticker: //發送心跳  告訴nsqlookup本身在線                                         
 2     // send a heartbeat and read a response (read detects closed conns)        
 3     for _, lookupPeer := range lookupPeers {                                   
 4         n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer)                   
 5         cmd := nsq.Ping()                                                      
 6         _, err := lookupPeer.Command(cmd)                                      
 7         if err != nil {                                                        
 8             n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)         
 9         }                                                                      
10     }                                                                          

  nsqlookupd做爲服務器端,啓動時就開啓了tcp監聽,每接受一個nsqd鏈接,就使用go開啓一個handle處理函數,最終和客戶端(nsqd)交互功能代碼在LookupProtocolV1文件中完成。

  此時咱們在看圖2的NSQ拓撲圖,說過了nsqd做爲client連接nsqlookupd服務器端後,咱們在順道說下nsqd開啓tcp監控做爲服務器時有哪些是客戶端,此時的client包括:生產者和消費者,即圖中的writer和reader。

8、Decorator裝飾器

  先看下NSQ源碼中對Decorator定義,其實就是一個函數的嵌套定義,源碼位置在internal/http_api/api_response.go文件中,代碼以下:

1 type Decorator func(APIHandler) APIHandler
2 
3 type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)

  下邊咱們來看兩個關於Decorate的使用

 1 func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
 2     decorated := f
 3     for _, decorate := range ds {
 4         decorated = decorate(decorated)
 5     }
 6     return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 7         decorated(w, req, ps)
 8     }
 9 }
10 
11 func Log(l app.Logger) Decorator { //Logger是go對應的log4版本
12     return func(f APIHandler) APIHandler {
13         return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
14             start := time.Now()            //當前時間
15             response, err := f(w, req, ps) //執行http請求 err錯誤狀態  response處理結果
16             elapsed := time.Since(start)   //f(APIHandler)調用時長
17             status := 200
18             if e, ok := err.(Err); ok {
19                 status = e.Code //重置錯誤碼
20             }
21             l.Output(2, fmt.Sprintf("%d %s %s (%s) %s",
22                 status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)) //輸出http處理日誌信息
23             return response, err //返回一個接口  和錯誤狀態
24         }
25     }
26 }

  首先先來看下代碼第一行定義的Decorate函數,這個函數其實就是一個裝飾函數,第一個參數爲須要被裝飾的視圖函數,從第二參數開始,都是裝飾函數,最後返回裝飾好的視圖函數。

  第11行代碼定義了一個Log函數,返回值爲Decorator類型,也就是代碼第12行return後邊的表達式,該表達式也是一個函數定義,其返回值爲APIHandler,第13行代碼return後邊的表達式爲其返回值。

 

相關資料

  NSQ如何在windows上安裝

  nsq源碼閱讀筆記之nsqd:hurray123 CSDN

  go語言nsq源碼解讀-基本介紹:熊窩窩

  Nsq源碼閱讀:胖梁的技術筆記

  nsq源碼分析之概述:羅道文的私房菜

 

若是您以爲文章不錯,不妨給個打賞,寫做不易,感謝各位的支持。您的支持是我最大的動力,謝謝!!! 

 

  


很重要--轉載聲明

  1. 本站文章無特別說明,皆爲原創,版權全部,轉載時請用連接的方式,給出原文出處。同時寫上原做者:朝十晚八 or Twowords
  2. 如要轉載,請原文轉載,如在轉載時修改本文,請事先告知,謝絕在轉載時經過修改本文達到有利於轉載者的目的。 

相關文章
相關標籤/搜索