frp幾乎全部的鏈接處理都是構建在mux模塊之上的,重要性沒必要多說,來看一下這是個啥吧git
ps: 安裝方法github
go get "github.com/fatedier/golib/net/mux"
該模塊很小,不到300行,分爲兩個文件:mux.go
和rule.go
。
由於rule.go
文件相對簡單一些,咱們先來看這個。golang
首先看其中所命名的函數類型MatchFunc
:算法
type MatchFunc func(data []byte) (match bool)
該類型的函數用來判斷data
屬於什麼協議。編程
那麼具體如何判斷呢,這裏也實現了三個例子:網絡
var ( HttpsNeedBytesNum uint32 = 1 HttpNeedBytesNum uint32 = 3 YamuxNeedBytesNum uint32 = 2 ) var HttpsMatchFunc MatchFunc = func(data []byte) bool { if len(data) < int(HttpsNeedBytesNum) { return false } if data[0] == 0x16 { return true } else { return false } } // From https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods var httpHeadBytes = map[string]struct{}{ "GET": struct{}{}, "HEA": struct{}{}, "POS": struct{}{}, "PUT": struct{}{}, "DEL": struct{}{}, "CON": struct{}{}, "OPT": struct{}{}, "TRA": struct{}{}, "PAT": struct{}{}, } var HttpMatchFunc MatchFunc = func(data []byte) bool { if len(data) < int(HttpNeedBytesNum) { return false } _, ok := httpHeadBytes[string(data[:3])] return ok } // From https://github.com/hashicorp/yamux/blob/master/spec.md var YamuxMatchFunc MatchFunc = func(data []byte) bool { if len(data) < int(YamuxNeedBytesNum) { return false } if data[0] == 0 && data[1] >= 0x0 && data[1] <= 0x3 { return true } return false }
這三個函數分別實現了區分HTTPS
,HTTP
以及go中特有的yamux
(實際上這是一個庫,能夠參考Go中的I/O多路複用)。app
先來看其中的struct
,第一個是Mux
第二個是listener
,這裏先來看一下較爲簡單的listener
。tcp
type listener struct { mux *Mux priority int needBytesNum uint32 matchFn MatchFunc c chan net.Conn mu sync.RWMutex } // Accept waits for and returns the next connection to the listener. func (ln *listener) Accept() (net.Conn, error) { ... } // Close removes this listener from the parent mux and closes the channel. func (ln *listener) Close() error { ... } func (ln *listener) Addr() net.Addr { ... }
剛看到這個結構體咱們可能很迷惑,不知道都是幹啥的,並且網絡編程中通常listener這種東西要綁定在一個套接字上,但很明顯listener
沒有,不過其惟一跟套接字相關的多是其c
字段,其是一個由net
包中的Conn
接口組成的chanel
;而後mu
字段就是讀寫鎖了,這個很簡單;而後mux
字段則是上面提到的兩個結構體中的另外一個結構體Mux
的指針;接下來到了priority
字段上,顧名思義,這個彷佛跟優先級有關係,暫且存疑;needBytesNum
則更有些蒙了,不過感受其是跟讀取byte的數量有關係,最後是matchFn
。函數
好,初步認識了這個結構體的結構後,咱們看看其方法。三個方法的listener
實現了net
模塊中的Listener
接口:ui
// A Listener is a generic network listener for stream-oriented protocols. // // Multiple goroutines may invoke methods on a Listener simultaneously. type Listener interface { // Accept waits for and returns the next connection to the listener. Accept() (Conn, error) // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. Close() error // Addr returns the listener's network address. Addr() Addr }
而後先來分析其Accept
方法:
func (ln *listener) Accept() (net.Conn, error) { conn, ok := <-ln.c if !ok { return nil, fmt.Errorf("network connection closed") } return conn, nil }
該方法很簡單,就是從c
這個由Conn
組成的channel
中,獲取Conn
對象,好這裏咱們就明白了,這個listener
和普通的不同,他很特別,普通的listener
監聽的是套接字,而他監聽的是channel
,另外,確定有某個地方在不停的往c
這個channel
中放Conn
。
接下來是Close
方法:
func (ln *listener) Close() error { if ok := ln.mux.release(ln); ok { // Close done to signal to any RLock holders to release their lock. close(ln.c) } return nil }
咱們暫且先把這個ln.mux.release(ln)
放到一邊,由於還不知道這個東西幹了啥,暫且只需關注close(ln.c)
,咱們知道這個函數是用來關閉channel
的,go推薦由發送端調用,但這裏彷佛listener
是一個消費端,能夠看一下如何優雅的關閉Go Channel,看來重點在於ln.mux.release(ln)
這裏,咱們暫且存疑[1],留待下面解決。
最後是Addr
方法:
func (ln *listener) Addr() net.Addr { if ln.mux == nil { return nil } ln.mux.mu.RLock() defer ln.mux.mu.RUnlock() if ln.mux.ln == nil { return nil } return ln.mux.ln.Addr() }
在這裏,mu
字段就用上了,加讀鎖,而後返回mux
字段中的ln
字段的Addr
方法。也就是這句return ln.mux.ln.Addr()
。
Mux結構體則相對來講複雜不少,先來看一下他的字段定義:
type Mux struct { ln net.Listener defaultLn *listener // sorted by priority lns []*listener maxNeedBytesNum uint32 mu sync.RWMutex }
好,第一個字段ln
是一個Listener
接口;而後defaultLn
是一個listener
的指針;lns
則是由listener
的指針組成的切片,根據註釋// sorted by priority
,咱們終於知道listener
的priority
字段是幹啥的了;接下來是maxNeedBytesNum
字段,好奇怪,比起listener
的needBytesNum
多了個「Max」,因此咱們推測這個值取得是lns
以及defaultLn
字段中全部listener
中needBytesNum
值最大的;最後的mu
字段咱們就不說了。
須要注意的是:咱們可能會發現Mux
和listener
存在相互引用,但在Go
中咱們倒也不用太擔憂,由於Go
採用「標記-回收」或者其變種的垃圾回收算法,感興趣能夠參考Golang 垃圾回收剖析
在mux.go
文件中定義了Mux
的生成函數NewMux
:
func NewMux(ln net.Listener) (mux *Mux) { mux = &Mux{ ln: ln, lns: make([]*listener, 0), } return }
很簡單,須要注意的是ln
字段存儲的通常不是listener
這樣的很是規Listener,通常是TCPListener
這樣具體的綁定了套接字的監聽器。
接下來看Mux
結構體的方法,首先看Listen
和copyLns
// priority func (mux *Mux) Listen(priority int, needBytesNum uint32, fn MatchFunc) net.Listener { // 1 ln := &listener{ c: make(chan net.Conn), mux: mux, priority: priority, needBytesNum: needBytesNum, matchFn: fn, } mux.mu.Lock() defer mux.mu.Unlock() // 2 if needBytesNum > mux.maxNeedBytesNum { mux.maxNeedBytesNum = needBytesNum } // 3 newlns := append(mux.copyLns(), ln) sort.Slice(newlns, func(i, j int) bool { if newlns[i].priority == newlns[j].priority { return newlns[i].needBytesNum < newlns[j].needBytesNum } return newlns[i].priority < newlns[j].priority }) mux.lns = newlns return ln } func (mux *Mux) copyLns() []*listener { lns := make([]*listener, 0, len(mux.lns)) for _, l := range mux.lns { lns = append(lns, l) } return lns }
copyLns
方法很簡單,就是跟名字的含義同樣,生成一個lns
字段的副本並返回。
Listen
基本作了三步:
listener
結構體實例,並獲取互斥鎖needBytesNum
字段listener
實例按照優先級放入lns
字段對應的slice中接下來是ListenHttp
和ListenHttps
方法:
func (mux *Mux) ListenHttp(priority int) net.Listener { return mux.Listen(priority, HttpNeedBytesNum, HttpMatchFunc) } func (mux *Mux) ListenHttps(priority int) net.Listener { return mux.Listen(priority, HttpsNeedBytesNum, HttpsMatchFunc) }
這兩個差很少,因此放到一塊兒說,基本都是專門寫了一個方法讓咱們能方便的建立處理Http
或者Https
的listener
。
再來看DefaultListener
方法:
func (mux *Mux) DefaultListener() net.Listener { mux.mu.Lock() defer mux.mu.Unlock() if mux.defaultLn == nil { mux.defaultLn = &listener{ c: make(chan net.Conn), mux: mux, } } return mux.defaultLn }
這個方法很簡單,基本就是有則返回沒有則生成而後返回的套路。不過咱們要注意defaultLn
字段中的listener
是不放入lns
字段中的。
接下來是Server
方法:
// Serve handles connections from ln and multiplexes then across registered listeners. func (mux *Mux) Serve() error { for { // Wait for the next connection. // If it returns a temporary error then simply retry. // If it returns any other error then exit immediately. conn, err := mux.ln.Accept() if err, ok := err.(interface { Temporary() bool }); ok && err.Temporary() { continue } if err != nil { return err } go mux.handleConn(conn) } }
通常來講,當咱們調用NewMux
函數之後,接下來就會調用Server
方法,該方法基本上就是阻塞監聽某個套接字,當有鏈接創建成功後當即另起一個goroutine調用handleConn
方法;當鏈接創建失敗根據err
是否含有Temporary
方法,若是有則執行並忽略錯誤,沒有則返回錯誤。
如今咱們看看handleConn
方法幹了些啥:
func (mux *Mux) handleConn(conn net.Conn) { // 1 mux.mu.RLock() maxNeedBytesNum := mux.maxNeedBytesNum lns := mux.lns defaultLn := mux.defaultLn mux.mu.RUnlock() // 2 sharedConn, rd := gnet.NewSharedConnSize(conn, int(maxNeedBytesNum)) data := make([]byte, maxNeedBytesNum) conn.SetReadDeadline(time.Now().Add(DefaultTimeout)) _, err := io.ReadFull(rd, data) if err != nil { conn.Close() return } conn.SetReadDeadline(time.Time{}) // 3 for _, ln := range lns { if match := ln.matchFn(data); match { err = errors.PanicToError(func() { ln.c <- sharedConn }) if err != nil { conn.Close() } return } } // No match listeners if defaultLn != nil { err = errors.PanicToError(func() { defaultLn.c <- sharedConn }) if err != nil { conn.Close() } return } // No listeners for this connection, close it. conn.Close() return }
handleConn
方法也不算複雜,大致能夠分爲三步:
conn
中讀取數據,注意:shareConn
和rd
存在單向關係,若是從rd
中讀取數據的話,數據也會複製一份放到shareConn
中,反過來就不成立了與matchFunc
匹配的最高優先級的listener
,並將shareConn
放入該listener
的c
字段中,若是沒有匹配到則放到defaultLn
中的c
字段中,若是defaultLn
是nil
的話就不處理,直接關閉conn
。最後來到了release
方法了:
func (mux *Mux) release(ln *listener) bool { result := false mux.mu.Lock() defer mux.mu.Unlock() lns := mux.copyLns() for i, l := range lns { if l == ln { lns = append(lns[:i], lns[i+1:]...) result = true break } } mux.lns = lns return result }
release方法意思很明確:把對應的listener
從lns
中移除,並把結果返回,整個過程有互斥鎖,咱們回到存疑1,儘管有互斥鎖,但在這種狀況下:當某個goroutine運行到handleConn
已經執行到了第三階段的開始狀態(也就是尚未找到匹配的listener
)時,且Go
運行在多核狀態下,當另外一個goroutine運行完listener
的Close
方法時,這時就可能發生往一個已經關閉的channel
中send數據,但請注意handleConn
的第三步的這段代碼:
err = errors.PanicToError(func() { // 就是這裏了 ln.c <- sharedConn }) if err != nil { conn.Close() }
這個PanicToError
是這樣的:
func PanicToError(fn func()) (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("Panic error: %v", r) } }() fn() return }
基本上就是執行了recover
而後將錯誤打印出來,結合下面的對err的判斷,就會將send失敗的conn關閉。
Mux
中包含了一個初始監聽器,基本上全部的事件(好比說新的鏈接創建,之因此叫事件是由於我實在想不出更精確的詞語了)都起源於此listener
實現了net.Listener
接口,能夠做爲二級監聽器使用(好比傳給net/http.Server
結構體的Server
方法進行處理)。Mux
包含了一個由listener
組成的有序slice,當有事件產生時就會遍歷這個slice找出合適的listener
並將事件傳給他。講到這裏基本上是完事了。整個mux
模塊仍是比較簡單的,起碼是由一個個簡單的東西組合而成。那麼一塊兒來意淫一下總體流程吧。
假如我要實現這麼一個網絡程序:
就這麼兩個很簡單的要求,不難吧。
那麼咱們一塊兒來實現吧:
type HandleFunc func(c net.Conn) (n int, err error) type MyServer struct { l net.Listener hFunc HandleFunc } func (h *MyServer) Server() (err error) { for { conn, err := h.l.Accept() if err != nil { return } go h.hFunc(conn) } } func HandleHttp(c net.Conn)(n int, err error){ n, err = c.Write([]byte("Get Off! Don't you know that it is not safe?")) } func HandleHttps(c net.Conn)(n int, err error){ n, err = c.Write([]byte("Get Off! Don't you know that this is more complicated than http?")) } func main() (err error){ ln, err := net.Listen("tcp", "0.0.0.0:12345") if err != nil { err = fmt.Errorf("Create server listener error, %v", err) return } muxer = mux.NewMux(ln) var lHttp, lHttps net.Listener lHttp = muxer.ListenHttp(1) httpServer := *MyServer{lHttp, HandleHttp} lHttps = muxer.ListenHttps(2) httpsServer := *MyServer{lHttps, HandleHttps} go httpServer.Server() go httpsServer.Server() err = muxer.Serve() }