Tao,在英文中的意思是「The ultimate principle of universe」,即「道」,它是宇宙的終極奧義。java
「道生一,一輩子二,二生三,三生無窮。」 ——《道德經》git
Tao同時也是我用Go語言開發的一個異步的TCP服務器框架(TCP Asynchronous Go server FramewOrk),秉承Go語言「Less is more」的極簡主義哲學,它能穿透一切表象,帶你一窺網絡編程的世界,讓你今後完全擺脫只會寫「socket-bind-listen-accept」的窘境。本文將簡單討論一下這個框架的設計思路以及本身的一些思考。github
你開發的產品有一套特有的業務邏輯,要經過互聯網獲得服務端的支持才能爲你的客戶提供服務。redis
怎樣快速穩定地實現產品的功能,而不須要耗費大量的時間處理各類底層的網絡通訊細節。編程
Tao提供了一種用框架支撐業務邏輯的機制。你只須要與客戶端定義好消息格式,而後將對應的業務邏輯編寫成函數註冊到框架中就能夠了。數組
讓咱們舉一個例子來看看如何使用Tao框架實現一個簡單的羣聊天服務器。服務器端代碼能夠這麼寫:安全
package main import ( "fmt" "net" "github.com/leesper/holmes" "github.com/leesper/tao" "github.com/leesper/tao/examples/chat" ) // ChatServer is the chatting server. type ChatServer struct { *tao.Server } // NewChatServer returns a ChatServer. func NewChatServer() *ChatServer { onConnectOption := tao.OnConnectOption(func(conn tao.WriteCloser) bool { holmes.Infoln("on connect") return true }) onErrorOption := tao.OnErrorOption(func(conn tao.WriteCloser) { holmes.Infoln("on error") }) onCloseOption := tao.OnCloseOption(func(conn tao.WriteCloser) { holmes.Infoln("close chat client") }) return &ChatServer{ tao.NewServer(onConnectOption, onErrorOption, onCloseOption), } } func main() { defer holmes.Start().Stop() tao.Register(chat.ChatMessage, chat.DeserializeMessage, chat.ProcessMessage) l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "0.0.0.0", 12345)) if err != nil { holmes.Fatalln("listen error", err) } chatServer := NewChatServer() err = chatServer.Start(l) if err != nil { holmes.Fatalln("start error", err) } }
啓動一個服務器只須要三步就能完成。首先註冊消息和業務邏輯回調,其次填入IP地址和端口,最後Start一下就能夠了。這時候客戶端就可以發起鏈接,並開始聊天。業務邏輯的實現很簡單,遍歷全部的鏈接,而後發送數據:服務器
// ProcessMessage handles the Message logic. func ProcessMessage(ctx context.Context, conn tao.WriteCloser) { holmes.Infof("ProcessMessage") s, ok := tao.ServerFromContext(ctx) if ok { msg := tao.MessageFromContext(ctx) s.Broadcast(msg) } }
Go語言是「雲計算時代的C語言」,適用於開發基礎性服務,好比服務器。它語法相似C語言且標準庫豐富,上手較快,因此開發效率高;編譯速度快,運行效率接近C,因此運行效率高。網絡
Go語言面向對象編程的風格是「多用組合,少用繼承」,以匿名嵌入的方式實現繼承。好比上面的聊天服務器ChatServer:數據結構
// ChatServer is the chatting server. type ChatServer struct { *tao.Server }
因而ChatServer就自動繼承了Server全部的屬性和方法。固然,這裏是以指針的方式嵌入的。
Go語言的面向接口編程是「鴨子類型」的,即「若是我走起來像鴨子,叫起來像鴨子,那麼我就是一隻鴨子」。其餘的編程語言須要顯示地說明本身繼承某個接口,Go語言卻採起的是「隱式聲明」的方式。好比Tao框架使用的多線程日誌庫Holmes實現「每小時建立一個新日誌文件」功能的核心代碼以下:
func (ls *logSegment)Write(p []byte) (n int, err error) { if ls.timeToCreate != nil && ls.logFile != os.Stdout && ls.logFile != os.Stderr { select { case current := <-ls.timeToCreate: ls.logFile.Close() ls.logFile = nil name := getLogFileName(current) ls.logFile, err = os.Create(path.Join(ls.logPath, name)) if err != nil { fmt.Fprintln(os.Stderr, err) ls.logFile = os.Stderr } else { next := current.Truncate(ls.unit).Add(ls.unit) ls.timeToCreate = time.After(next.Sub(time.Now())) } default: // do nothing } } return ls.logFile.Write(p) }
而標準庫中的io.Writer定義以下,那麼這裏的logSegment就實現了io.Writer的接口,全部以io.Writer做爲形參的函數,我均可以傳一個logSegment的實參進去。
type Writer interface { Write(p []byte) (n int, err error) }
掌握Go語言,要把握「一箇中心,兩個基本點」。「一箇中心」是Go語言併發模型,即「不要經過共享內存來通訊,要經過通訊來共享內存」;「兩個基本點」是Go語言的併發模型的兩大基石:channel和go-routine。理解了它們就能看懂大部分代碼。下面讓咱們正式開始介紹Tao框架吧。
Tao框架支持經過tao.TLSCredsOption()函數提供傳輸層安全的TLS Server。服務器的核心職責是「監聽並接受客戶端鏈接」。每一個進程可以打開的文件描述符是有限制的,因此它還須要限制最大併發鏈接數,關鍵代碼以下:
// Start starts the TCP server, accepting new clients and creating service // go-routine for each. The service go-routines read messages and then call // the registered handlers to handle them. Start returns when failed with fatal // errors, the listener willl be closed when returned. func (s *Server) Start(l net.Listener) error { s.mu.Lock() if s.lis == nil { s.mu.Unlock() l.Close() return ErrServerClosed } s.lis[l] = true s.mu.Unlock() defer func() { s.mu.Lock() if s.lis != nil && s.lis[l] { l.Close() delete(s.lis, l) } s.mu.Unlock() }() holmes.Infof("server start, net %s addr %s\n", l.Addr().Network(), l.Addr().String()) s.wg.Add(1) go s.timeOutLoop() var tempDelay time.Duration for { rawConn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay >= max { tempDelay = max } holmes.Errorf("accept error %v, retrying in %d\n", err, tempDelay) select { case <-time.After(tempDelay): case <-s.ctx.Done(): } continue } return err } tempDelay = 0 // how many connections do we have ? sz := s.conns.Size() if sz >= MaxConnections { holmes.Warnf("max connections size %d, refuse\n", sz) rawConn.Close() continue } if s.opts.tlsCfg != nil { rawConn = tls.Server(rawConn, s.opts.tlsCfg) } netid := netIdentifier.GetAndIncrement() sc := NewServerConn(netid, s, rawConn) sc.SetName(sc.rawConn.RemoteAddr().String()) s.mu.Lock() if s.sched != nil { sc.RunEvery(s.interv, s.sched) } s.mu.Unlock() s.conns.Put(netid, sc) addTotalConn(1) s.wg.Add(1) go func() { sc.Start() }() holmes.Infof("accepted client %s, id %d, total %d\n", sc.GetName(), netid, s.conns.Size()) s.conns.RLock() for _, c := range s.conns.m { holmes.Infof("client %s\n", c.GetName()) } s.conns.RUnlock() } // for loop }
若是服務器在接受客戶端鏈接請求的時候發生了臨時錯誤,那麼服務器將等待最多1秒的時間再從新嘗試接受請求,若是現有的鏈接數超過了MaxConnections(默認1000),就拒絕並關閉鏈接,不然啓動一個新的鏈接開始工做。
Go語言在發佈1.7版時在標準庫中引入了context包。context包提供的Context結構可以在服務器,網絡鏈接以及各相關線程之間創建一種相關聯的「上下文」關係。這種上下文關係包含的信息是與某次網絡請求有關的(request scoped),所以與該請求有關的全部Go線程都能安全地訪問這個上下文結構,讀取或者寫入與上下文有關的數據。好比handleLoop線程會將某個網絡鏈接的net ID以及message打包到上下文結構中,而後連同handler函數一塊兒交給工做者線程去處理:
// handleLoop() - put handler or timeout callback into worker go-routines func handleLoop(c WriteCloser, wg *sync.WaitGroup) { //... omitted ... for { select { //... omitted ... case msgHandler := <-handlerCh: msg, handler := msgHandler.message, msgHandler.handler if handler != nil { if askForWorker { WorkerPoolInstance().Put(netID, func() { handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c) }) } } //... omitted ... } }
隨後,在工做者線程真正執行時,業務邏輯代碼就能在handler函數中獲取到message或者net ID,這些都是與本次請求有關的上下文數據,好比一個典型的echo server就會這樣處理:
// ProcessMessage process the logic of echo message. func ProcessMessage(ctx context.Context, conn tao.WriteCloser) { msg := tao.MessageFromContext(ctx).(Message) holmes.Infof("receving message %s\n", msg.Content) conn.Write(msg) }
使用context的另一個場景是實現服務器及網絡鏈接的「優雅關閉」。服務器在管理網絡鏈接時會將本身的上下文傳遞給它,而網絡鏈接啓動新線程時一樣也會將本身的上下文傳遞給這些線程,這些上下文都是可取消(cancelable)的。當服務器須要停機或者鏈接將要關閉時,只要調用cancel函數,全部這些線程就能收到通知並退出。服務器或者網絡鏈接經過阻塞等待這些線程關閉以後再關閉,就能最大限度保證正確退出。服務器關閉的關鍵代碼以下:
// Stop gracefully closes the server, it blocked until all connections // are closed and all go-routines are exited. func (s *Server) Stop() { // immediately stop accepting new clients s.mu.Lock() listeners := s.lis s.lis = nil s.mu.Unlock() for l := range listeners { l.Close() holmes.Infof("stop accepting at address %s\n", l.Addr().String()) } // close all connections conns := map[int64]*ServerConn{} s.conns.RLock() for k, v := range s.conns.m { conns[k] = v } s.conns.Clear() s.conns.RUnlock() for _, c := range conns { c.rawConn.Close() holmes.Infof("close client %s\n", c.GetName()) } s.mu.Lock() s.cancel() s.mu.Unlock() s.wg.Wait() holmes.Infoln("server stopped gracefully, bye.") os.Exit(0) }
在其餘的編程語言中,採用Reactor模式編寫的服務器每每須要在一個IO線程異步地經過epoll進行多路複用。而由於Go線程的開銷廉價,Go語言能夠對每個網絡鏈接建立三個go-routine。readLoop()負責讀取數據並反序列化成消息;writeLoop()負責序列化消息併發送二進制字節流;最後handleLoop()負責調用消息處理函數。這三個協程在鏈接建立並啓動時就會各自獨立運行:
// Start starts the server connection, creating go-routines for reading, // writing and handlng. func (sc *ServerConn) Start() { holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr()) onConnect := sc.belong.opts.onConnect if onConnect != nil { onConnect(sc) } loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop} for _, l := range loopers { looper := l sc.wg.Add(1) go looper(sc, sc.wg) } }
readLoop作了三件關鍵的工做。首先調用消息編解碼gg器將接收到的字節流反序列化成消息;而後更新用於心跳檢測的時間戳;最後,根據消息的協議號找到對應的消息處理函數,若是註冊了消息回調函數,那麼就調用該函數處理消息,不然將消息和處理函數打包發送到handlerCh中,注意其中的cDone和sDone分別是網絡鏈接和服務器上下文結構中的channel,分別用於監聽網絡鏈接和服務器的「關閉」事件通知(下同)。
/* readLoop() blocking read from connection, deserialize bytes into message, then find corresponding handler, put it into channel */ func readLoop(c WriteCloser, wg *sync.WaitGroup) { var ( rawConn net.Conn codec Codec cDone <-chan struct{} sDone <-chan struct{} setHeartBeatFunc func(int64) onMessage onMessageFunc handlerCh chan MessageHandler msg Message err error ) switch c := c.(type) { case *ServerConn: rawConn = c.rawConn codec = c.belong.opts.codec cDone = c.ctx.Done() sDone = c.belong.ctx.Done() setHeartBeatFunc = c.SetHeartBeat onMessage = c.belong.opts.onMessage handlerCh = c.handlerCh case *ClientConn: rawConn = c.rawConn codec = c.opts.codec cDone = c.ctx.Done() sDone = nil setHeartBeatFunc = c.SetHeartBeat onMessage = c.opts.onMessage handlerCh = c.handlerCh } defer func() { if p := recover(); p != nil { holmes.Errorf("panics: %v\n", p) } wg.Done() holmes.Debugln("readLoop go-routine exited") c.Close() }() for { select { case <-cDone: // connection closed holmes.Debugln("receiving cancel signal from conn") return case <-sDone: // server closed holmes.Debugln("receiving cancel signal from server") return default: msg, err = codec.Decode(rawConn) if err != nil { holmes.Errorf("error decoding message %v\n", err) if _, ok := err.(ErrUndefined); ok { // update heart beats setHeartBeatFunc(time.Now().UnixNano()) continue } return } setHeartBeatFunc(time.Now().UnixNano()) handler := GetHandlerFunc(msg.MessageNumber()) if handler == nil { if onMessage != nil { holmes.Infof("message %d call onMessage()\n", msg.MessageNumber()) onMessage(msg, c.(WriteCloser)) } else { holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber()) } continue } handlerCh <- MessageHandler{msg, handler} } } }
writeLoop作了一件事情,從sendCh中讀取已序列化好的字節流,而後發送到網絡上。可是要注意,該協程在鏈接關閉退出執行以前,會非阻塞地將sendCh中的消息所有發送完畢再退出,避免漏發消息,這就是關鍵所在。
/* writeLoop() receive message from channel, serialize it into bytes, then blocking write into connection */ func writeLoop(c WriteCloser, wg *sync.WaitGroup) { var ( rawConn net.Conn sendCh chan []byte cDone <-chan struct{} sDone <-chan struct{} pkt []byte err error ) switch c := c.(type) { case *ServerConn: rawConn = c.rawConn sendCh = c.sendCh cDone = c.ctx.Done() sDone = c.belong.ctx.Done() case *ClientConn: rawConn = c.rawConn sendCh = c.sendCh cDone = c.ctx.Done() sDone = nil } defer func() { if p := recover(); p != nil { holmes.Errorf("panics: %v\n", p) } // drain all pending messages before exit OuterFor: for { select { case pkt = <-sendCh: if pkt != nil { if _, err = rawConn.Write(pkt); err != nil { holmes.Errorf("error writing data %v\n", err) } } default: break OuterFor } } wg.Done() holmes.Debugln("writeLoop go-routine exited") c.Close() }() for { select { case <-cDone: // connection closed holmes.Debugln("receiving cancel signal from conn") return case <-sDone: // server closed holmes.Debugln("receiving cancel signal from server") return case pkt = <-sendCh: if pkt != nil { if _, err = rawConn.Write(pkt); err != nil { holmes.Errorf("error writing data %v\n", err) return } } } } }
readLoop將消息和處理函數打包發給了handlerCh,因而handleLoop就從handlerCh中取出消息和處理函數,而後交給工做者線程池,由後者負責調度執行,完成對消息的處理。這裏很好的詮釋了Go語言是如何經過channel實現Go線程間通訊的。
// handleLoop() - put handler or timeout callback into worker go-routines func handleLoop(c WriteCloser, wg *sync.WaitGroup) { var ( cDone <-chan struct{} sDone <-chan struct{} timerCh chan *OnTimeOut handlerCh chan MessageHandler netID int64 ctx context.Context askForWorker bool ) switch c := c.(type) { case *ServerConn: cDone = c.ctx.Done() sDone = c.belong.ctx.Done() timerCh = c.timerCh handlerCh = c.handlerCh netID = c.netid ctx = c.ctx askForWorker = true case *ClientConn: cDone = c.ctx.Done() sDone = nil timerCh = c.timing.timeOutChan handlerCh = c.handlerCh netID = c.netid ctx = c.ctx } defer func() { if p := recover(); p != nil { holmes.Errorf("panics: %v\n", p) } wg.Done() holmes.Debugln("handleLoop go-routine exited") c.Close() }() for { select { case <-cDone: // connectin closed holmes.Debugln("receiving cancel signal from conn") return case <-sDone: // server closed holmes.Debugln("receiving cancel signal from server") return case msgHandler := <-handlerCh: msg, handler := msgHandler.message, msgHandler.handler if handler != nil { if askForWorker { WorkerPoolInstance().Put(netID, func() { handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c) }) addTotalHandle() } else { handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c) } } case timeout := <-timerCh: if timeout != nil { timeoutNetID := NetIDFromContext(timeout.Ctx) if timeoutNetID != netID { holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID) } if askForWorker { WorkerPoolInstance().Put(netID, func() { timeout.Callback(time.Now(), c.(WriteCloser)) }) } else { timeout.Callback(time.Now(), c.(WriteCloser)) } } } } }
任何一個實現了Message接口的類型,都是一個消息,它須要提供方法訪問本身的協議號並將本身序列化成字節數組;另外,每一個消息都須要註冊本身的反序列化函數和處理函數:
// Handler takes the responsibility to handle incoming messages. type Handler interface { Handle(context.Context, interface{}) } // HandlerFunc serves as an adapter to allow the use of ordinary functions as handlers. type HandlerFunc func(context.Context, WriteCloser) // Handle calls f(ctx, c) func (f HandlerFunc) Handle(ctx context.Context, c WriteCloser) { f(ctx, c) } // UnmarshalFunc unmarshals bytes into Message. type UnmarshalFunc func([]byte) (Message, error) // handlerUnmarshaler is a combination of unmarshal and handle functions for message. type handlerUnmarshaler struct { handler HandlerFunc unmarshaler UnmarshalFunc } func init() { messageRegistry = map[int32]messageFunc{} buf = new(bytes.Buffer) } // Register registers the unmarshal and handle functions for msgType. // If no unmarshal function provided, the message will not be parsed. // If no handler function provided, the message will not be handled unless you // set a default one by calling SetOnMessageCallback. // If Register being called twice on one msgType, it will panics. func Register(msgType int32, unmarshaler func([]byte) (Message, error), handler func(context.Context, WriteCloser)) { if _, ok := messageRegistry[msgType]; ok { panic(fmt.Sprintf("trying to register message %d twice", msgType)) } messageRegistry[msgType] = handlerUnmarshaler{ unmarshaler: unmarshaler, handler: HandlerFunc(handler), } } // GetUnmarshalFunc returns the corresponding unmarshal function for msgType. func GetUnmarshalFunc(msgType int32) UnmarshalFunc { entry, ok := messageRegistry[msgType] if !ok { return nil } return entry.unmarshaler } // GetHandlerFunc returns the corresponding handler function for msgType. func GetHandlerFunc(msgType int32) HandlerFunc { entry, ok := messageRegistry[msgType] if !ok { return nil } return entry.handler } // Message represents the structured data that can be handled. type Message interface { MessageNumber() int32 Serialize() ([]byte, error) }
對每一個消息處理函數而言,要處理的消息以及發送該消息的客戶端都是不一樣的,這些信息被稱爲「消息上下文」,用Context結構表示,每一個不一樣的客戶端用一個64位整數netid標識:
// Context is the context info for every handler function. // Handler function handles the business logic about message. // We can find the client connection who sent this message by netid and send back responses. type Context struct{ message Message netid int64 } func NewContext(msg Message, id int64) Context { return Context{ message: msg, netid: id, } } func (ctx Context)Message() Message { return ctx.message } func (ctx Context)Id() int64 { return ctx.netid }
接收數據時,編解碼gg器(Codec)負責按照必定的格式將網絡鏈接上讀取的字節數據反序列化成消息,並將消息交給上層處理(解碼);發送數據時,編解碼gg器將上層傳遞過來的消息序列化成字節數據,交給下層發送(編碼):
// Codec is the interface for message coder and decoder. // Application programmer can define a custom codec themselves. type Codec interface { Decode(Connection) (Message, error) Encode(Message) ([]byte, error) }
Tao框架採用的是「Type-Length-Data」的格式打包數據。Type佔4個字節,表示協議類型;Length佔4個字節,表示消息長度,Data爲變長字節序列,長度由Length表示。反序列化時,由Type字段能夠肯定協議類型,而後截取Length長度的字節數據Data,並調用已註冊的反序列化函數處理。核心代碼以下:
// Codec is the interface for message coder and decoder. // Application programmer can define a custom codec themselves. type Codec interface { Decode(net.Conn) (Message, error) Encode(Message) ([]byte, error) } // TypeLengthValueCodec defines a special codec. // Format: type-length-value |4 bytes|4 bytes|n bytes <= 8M| type TypeLengthValueCodec struct{} // Decode decodes the bytes data into Message func (codec TypeLengthValueCodec) Decode(raw net.Conn) (Message, error) { byteChan := make(chan []byte) errorChan := make(chan error) go func(bc chan []byte, ec chan error) { typeData := make([]byte, MessageTypeBytes) _, err := io.ReadFull(raw, typeData) if err != nil { ec <- err close(bc) close(ec) holmes.Debugln("go-routine read message type exited") return } bc <- typeData }(byteChan, errorChan) var typeBytes []byte select { case err := <-errorChan: return nil, err case typeBytes = <-byteChan: if typeBytes == nil { holmes.Warnln("read type bytes nil") return nil, ErrBadData } typeBuf := bytes.NewReader(typeBytes) var msgType int32 if err := binary.Read(typeBuf, binary.LittleEndian, &msgType); err != nil { return nil, err } lengthBytes := make([]byte, MessageLenBytes) _, err := io.ReadFull(raw, lengthBytes) if err != nil { return nil, err } lengthBuf := bytes.NewReader(lengthBytes) var msgLen uint32 if err = binary.Read(lengthBuf, binary.LittleEndian, &msgLen); err != nil { return nil, err } if msgLen > MessageMaxBytes { holmes.Errorf("message(type %d) has bytes(%d) beyond max %d\n", msgType, msgLen, MessageMaxBytes) return nil, ErrBadData } // read application data msgBytes := make([]byte, msgLen) _, err = io.ReadFull(raw, msgBytes) if err != nil { return nil, err } // deserialize message from bytes unmarshaler := GetUnmarshalFunc(msgType) if unmarshaler == nil { return nil, ErrUndefined(msgType) } return unmarshaler(msgBytes) } }
這裏的代碼存在一些微妙的設計,須要仔細解釋一下。TypeLengthValueCodec.Decode()函數會被readLoop協程用到。由於io.ReadFull()是同步調用,沒有數據可讀時會阻塞readLoop協程。此時若是關閉網絡鏈接,readLoop協程將沒法退出。因此這裏的代碼用到了一個小技巧:專門開闢了一個新協程來等待讀取最開始的4字節Type數據,而後本身select阻塞在多個channel上,這樣就不會忽略其餘channel傳遞過來的消息。一旦成功讀取到Type數據,就繼續後面的流程:讀取Length數據,根據Length讀取應用數據交給先前註冊好的反序列化函數。注意,若是收到超過最大長度的數據就會關閉鏈接,這是爲了防止外部程序惡意消耗系統資源。
爲了提升框架的健壯性,避免由於處理業務邏輯形成的響應延遲,消息處理函數通常都會被調度到工做者協程池執行。設計工做者協程池的一個關鍵是如何將任務散列給池子中的不一樣協程。一方面,要避免併發問題,必須保證同一個網絡鏈接發來的消息都被散列到同一個協程按順序執行;另外一方面,散列必定要是均勻的,不能讓協程「忙的忙死,閒的閒死」。關鍵仍是在散列函數的設計上。
協程池是按照單例模式設計的。建立時會調用newWorker()建立一系列worker協程。
// WorkerPool is a pool of go-routines running functions. type WorkerPool struct { workers []*worker closeChan chan struct{} } var ( globalWorkerPool *WorkerPool ) func init() { globalWorkerPool = newWorkerPool(WorkersNum) } // WorkerPoolInstance returns the global pool. func WorkerPoolInstance() *WorkerPool { return globalWorkerPool } func newWorkerPool(vol int) *WorkerPool { if vol <= 0 { vol = WorkersNum } pool := &WorkerPool{ workers: make([]*worker, vol), closeChan: make(chan struct{}), } for i := range pool.workers { pool.workers[i] = newWorker(i, 1024, pool.closeChan) if pool.workers[i] == nil { panic("worker nil") } } return pool }
給工做者協程分配任務的方式很簡單,經過hashCode()散列函數找到對應的worker協程,而後把回調函數發送到對應協程的channel中。對應協程在運行時就會從channel中取出而後執行,在start()函數中。
// Put appends a function to some worker's channel. func (wp *WorkerPool) Put(k interface{}, cb func()) error { code := hashCode(k) return wp.workers[code&uint32(len(wp.workers)-1)].put(workerFunc(cb)) } func (w *worker) start() { for { select { case <-w.closeChan: return case cb := <-w.callbackChan: before := time.Now() cb() addTotalTime(time.Since(before).Seconds()) } } } func (w *worker) put(cb workerFunc) error { select { case w.callbackChan <- cb: return nil default: return ErrWouldBlock } }
Tao框架設計了一個定時器TimingWheel,用來控制定時任務。Connection在此基礎上進行了進一步封裝。提供定時執行(RunAt),延時執行(RunAfter)和週期執行(RunEvery)功能。這裏經過定時器的設計引出多線程編程的一點經驗之談。
每一個定時任務由一個timerType表示,它帶有本身的id和包含定時回調函數的結構OnTimeOut。expiration表示該任務到期要被執行的時間,interval表示時間間隔,interval > 0意味着該任務是會被週期性重複執行的任務。
/* 'expiration' is the time when timer time out, if 'interval' > 0 the timer will time out periodically, 'timeout' contains the callback to be called when times out */ type timerType struct { id int64 expiration time.Time interval time.Duration timeout *OnTimeOut index int // for container/heap } // OnTimeOut represents a timed task. type OnTimeOut struct { Callback func(time.Time, WriteCloser) Ctx context.Context } // NewOnTimeOut returns OnTimeOut. func NewOnTimeOut(ctx context.Context, cb func(time.Time, WriteCloser)) *OnTimeOut { return &OnTimeOut{ Callback: cb, Ctx: ctx, } }
定時器須要按照到期時間的順序從最近到最遠排列,這是一個自然的小頂堆,因而這裏採用標準庫container/heap建立了一個堆數據結構來組織定時任務,存取效率達到O(nlogn)。
// timerHeap is a heap-based priority queue type timerHeapType []*timerType func (heap timerHeapType) getIndexByID(id int64) int { for _, t := range heap { if t.id == id { return t.index } } return -1 } func (heap timerHeapType) Len() int { return len(heap) } func (heap timerHeapType) Less(i, j int) bool { return heap[i].expiration.UnixNano() < heap[j].expiration.UnixNano() } func (heap timerHeapType) Swap(i, j int) { heap[i], heap[j] = heap[j], heap[i] heap[i].index = i heap[j].index = j } func (heap *timerHeapType) Push(x interface{}) { n := len(*heap) timer := x.(*timerType) timer.index = n *heap = append(*heap, timer) } func (heap *timerHeapType) Pop() interface{} { old := *heap n := len(old) timer := old[n-1] timer.index = -1 *heap = old[0 : n-1] return timer }
TimingWheel在建立時會啓動一個單獨協程來運行定時器核心代碼start()。它在多個channel上進行多路複用操做:若是從cancelChan收到timerId,就執行取消操做:從堆上刪除對應的定時任務;將定時任務數量發送給sizeChan,別的線程就能獲取當前定時任務數;若是從quitChan收到消息,定時器就會被關閉而後退出;若是從addChan收到timer,就將該定時任務添加到堆;若是從tw.ticker.C收到定時信號,就調用getExpired()函數獲取到期的任務,而後將這些任務回調發送到TimeOutChannel中,其餘相關線程會經過該channel獲取並執行定時回調。最後tw.update()會更新週期性執行的定時任務,從新調度執行。
func (tw *TimingWheel) update(timers []*timerType) { if timers != nil { for _, t := range timers { if t.isRepeat() { t.expiration = t.expiration.Add(t.interval) heap.Push(&tw.timers, t) } } } } func (tw *TimingWheel) start() { for { select { case timerID := <-tw.cancelChan: index := tw.timers.getIndexByID(timerID) if index >= 0 { heap.Remove(&tw.timers, index) } case tw.sizeChan <- tw.timers.Len(): case <-tw.ctx.Done(): tw.ticker.Stop() return case timer := <-tw.addChan: heap.Push(&tw.timers, timer) case <-tw.ticker.C: timers := tw.getExpired() for _, t := range timers { tw.GetTimeOutChannel() <- t.timeout } tw.update(timers) } } }
用Tao框架開發的服務器一開始老是時不時地崩潰。有時候運行了幾個小時服務器就忽然退出了。查看打印出來的調用棧發現。每次程序都在定時器上崩潰,緣由是數組訪問越界。這就是併發訪問致使的問題,爲何呢?由於定時器的核心函數在一個協程中操做堆數據結構,與此同時其提供的添加,刪除等接口卻有可能在其餘協程中調用。多個協程併發訪問一個沒有加鎖的數據結構,必然會出現問題。解決方法很簡單:將多個協程的併發訪問轉化爲單個協程的串行訪問,也就是將添加,刪除等操做發送給不一樣的channel,而後在start()協程中統一處理:
// AddTimer adds new timed task. func (tw *TimingWheel) AddTimer(when time.Time, interv time.Duration, to *OnTimeOut) int64 { if to == nil { return int64(-1) } timer := newTimer(when, interv, to) tw.addChan <- timer return timer.id } // Size returns the number of timed tasks. func (tw *TimingWheel) Size() int { return <-tw.sizeChan } // CancelTimer cancels a timed task with specified timer ID. func (tw *TimingWheel) CancelTimer(timerID int64) { tw.cancelChan <- timerID }
陳碩在他的《Linux多線程服務端編程》一書中說到,維護長鏈接的服務器都應該在應用層本身實現心跳消息:
「在嚴肅的網絡程序中,應用層的心跳協議是必不可少的。應該用心跳消息來判斷對方進程是否能正常工做。」
要使用一個鏈接來同時發送心跳和其餘業務消息,這樣一旦應用層由於出錯發不出消息,對方就可以馬上經過心跳中止感知到。值得注意的是,在Tao框架中,定時器只有一個,而客戶端鏈接可能會有不少個。在長鏈接模式下,每一個客戶端都須要處理心跳包,或者其餘類型的定時任務。將框架設計爲「每一個客戶端鏈接自帶一個定時器」是不合適的——有十萬個鏈接就有十萬個定時器,會有較高的CPU佔用率。定時器應該只有一個,全部客戶端註冊進來的定時任務都由它負責處理。可是若是全部的客戶端鏈接都等待惟一一個定時器發來的消息,就又會存在併發問題。好比client 1的定時任務到期了,但它如今正忙着處理其餘消息,這個定時任務就可能被其餘client執行。因此這裏採起了一種「先集中後分散」的處理機制:每個定時任務都由一個TimeOut結構表示,該結構中除了回調函數還包含一個context。客戶端啓動定時任務的時候都會填入net ID。TCPServer統一接收定時任務,而後從定時任務中取出net ID,而後將該定時任務交給相應的ServerConn或ClientConn去執行:
// Retrieve the extra data(i.e. net id), and then redispatch timeout callbacks // to corresponding client connection, this prevents one client from running // callbacks of other clients func (s *Server) timeOutLoop() { defer s.wg.Done() for { select { case <-s.ctx.Done(): return case timeout := <-s.timing.GetTimeOutChannel(): netID := timeout.Ctx.Value(netIDCtx).(int64) if sc, ok := s.conns.Get(netID); ok { sc.timerCh <- timeout } else { holmes.Warnf("invalid client %d\n", netID) } } } }
當咱們談論併發編程的時候,咱們在談論什麼?用一句話歸納:當多個線程同時訪問一個未受保護的共享數據時,就會產生併發問題。那麼多線程編程的本質就是怎樣避免上述狀況的發生了。這裏總結一些,有三種基本的方法。
這是教科書上最多見的方法了。用各類信號量/互斥鎖對數據結構進行保護,先加鎖,而後執行操做,最後解鎖。舉個例子,Tao框架中用於網絡鏈接管理的ConnMap就是這麼實現的:
// ConnMap is a safe map for server connection management. type ConnMap struct { sync.RWMutex m map[int64]*ServerConn } // NewConnMap returns a new ConnMap. func NewConnMap() *ConnMap { return &ConnMap{ m: make(map[int64]*ServerConn), } } // Clear clears all elements in map. func (cm *ConnMap) Clear() { cm.Lock() cm.m = make(map[int64]*ServerConn) cm.Unlock() } // Get gets a server connection with specified net ID. func (cm *ConnMap) Get(id int64) (*ServerConn, bool) { cm.RLock() sc, ok := cm.m[id] cm.RUnlock() return sc, ok } // Put puts a server connection with specified net ID in map. func (cm *ConnMap) Put(id int64, sc *ServerConn) { cm.Lock() cm.m[id] = sc cm.Unlock() } // Remove removes a server connection with specified net ID. func (cm *ConnMap) Remove(id int64) { cm.Lock() delete(cm.m, id) cm.Unlock() } // Size returns map size. func (cm *ConnMap) Size() int { cm.RLock() size := len(cm.m) cm.RUnlock() return size } // IsEmpty tells whether ConnMap is empty. func (cm *ConnMap) IsEmpty() bool { return cm.Size() <= 0 }
這種方法在前面已經介紹過,它屬於無鎖化的一種編程方式。多個線程的操做請求都放到一個任務隊列中,最終由一個單一的線程來讀取隊列並串行執行。這種方法在併發量很大的時候仍是會有性能瓶頸。
最好的辦法仍是要從數據結構上入手,有不少技巧可以讓數據結構適應多線程併發訪問的場景。好比Java標準庫中的java.util.concurrent,包含了各類併發數據結構,其中ConcurrentHashMap的基本原理就是分段鎖,對每一個段(Segment)加鎖保護,併發寫入數據時經過散列函數分發到不一樣的段上面,在SegmentA上加鎖並不影響SegmentB的訪問。
處理併發多線程問題,必定要當心再當心,思考再思考,一不注意就會踩坑