Golang 做爲普遍用於服務端和雲計算領域的編程語言,tcp socket 是其中相當重要的功能。不管是 WEB 服務器仍是各種中間件都離不開 tcp socket 的支持。git
與早期的每一個線程持有一個 socket 的 block IO 模型不一樣, 多路IO複用模型使用單個線程監聽多個 socket, 當某個 socket 準備好數據後再進行響應。在邏輯上與使用 select 語句監聽多個 channel 的模式相同。github
目前主要的多路IO複用實現主要包括: SELECT, POLL 和 EPOLL。 爲了提升開發效率社區也出現不少封裝庫, 如Netty(Java), Tornado(Python) 和 libev(C)等。編程
Golang Runtime 封裝了各操做系統平臺上的多路IO複用接口, 並容許使用 goroutine 快速開發高性能的 tcp 服務器。緩存
做爲開始,咱們來實現一個簡單的 Echo 服務器。它會接受客戶端鏈接並將客戶端發送的內容原樣傳回客戶端。安全
package main import ( "fmt" "net" "io" "log" "bufio" ) func ListenAndServe(address string) { // 綁定監聽地址 listener, err := net.Listen("tcp", address) if err != nil { log.Fatal(fmt.Sprintf("listen err: %v", err)) } defer listener.Close() log.Println(fmt.Sprintf("bind: %s, start listening...", address)) for { // Accept 會一直阻塞直到有新的鏈接創建或者listen中斷纔會返回 conn, err := listener.Accept() if err != nil { // 一般是因爲listener被關閉沒法繼續監聽致使的錯誤 log.Fatal(fmt.Sprintf("accept err: %v", err)) } // 開啓新的 goroutine 處理該鏈接 go Handle(conn) } } func Handle(conn net.Conn) { // 使用 bufio 標準庫提供的緩衝區功能 reader := bufio.NewReader(conn) for { // ReadString 會一直阻塞直到遇到分隔符 '\n' // 遇到分隔符後會返回上次遇到分隔符或鏈接創建後收到的全部數據, 包括分隔符自己 // 若在遇到分隔符以前遇到異常, ReadString 會返回已收到的數據和錯誤信息 msg, err := reader.ReadString('\n') if err != nil { // 一般遇到的錯誤是鏈接中斷或被關閉,用io.EOF表示 if err == io.EOF { log.Println("connection close") } else { log.Println(err) } return } b := []byte(msg) // 將收到的信息發送給客戶端 conn.Write(b) } } func main() { ListenAndServe(":8000") }
使用 telnet 工具測試咱們編寫的 Echo 服務器:bash
$ telnet 127.0.0.1 8000 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. > a a > b b Connection closed by foreign host.
HTTP 等應用層協議只有收到一條完整的消息後才能進行處理,而工做在傳輸層的TCP協議並不瞭解應用層消息的結構。服務器
所以,可能遇到一條應用層消息分爲兩個TCP包發送或者一個TCP包中含有兩條應用層消息片斷的狀況,前者稱爲拆包後者稱爲粘包。併發
在 Echo 服務器的示例中,咱們定義用\n
表示消息結束。咱們可能遇到下列幾種狀況:socket
當咱們使用 tcp socket 開發應用層程序時必須正確處理拆包和粘包。tcp
bufio 標準庫會緩存收到的數據直到遇到分隔符纔會返回,它能夠正確處理拆包和粘包。
上層協議一般採用下列幾種思路之一來定義消息,以保證完整地進行讀取:
在生產環境下須要保證TCP服務器關閉前完成必要的清理工做,包括將完成正在進行的數據傳輸,關閉TCP鏈接等。這種關閉模式稱爲優雅關閉,能夠避免資源泄露以及客戶端未收到完整數據形成異常。
TCP 服務器的優雅關閉模式一般爲: 先關閉listener阻止新鏈接進入,而後遍歷全部鏈接逐個進行關閉。
本節完整源代碼地址: https://github.com/HDT3213/godis/tree/master/src/server
首先修改一下TCP服務器:
// handler 是應用層服務器的抽象 type Handler interface { Handle(ctx context.Context, conn net.Conn) Close()error } func ListenAndServe(cfg *Config, handler tcp.Handler) { listener, err := net.Listen("tcp", cfg.Address) if err != nil { logger.Fatal(fmt.Sprintf("listen err: %v", err)) } // 監聽中斷信號 // atomic.AtomicBool 是做者寫的封裝: https://github.com/HDT3213/godis/blob/master/src/lib/sync/atomic/bool.go var closing atomic.AtomicBool sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) go func() { sig := <-sigCh switch sig { case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: // 收到中斷信號後開始關閉流程 logger.Info("shuting down...") // 設置標誌位爲關閉中, 使用原子操做保證線程可見性 closing.Set(true) // listener 關閉後 listener.Accept() 會當即返回錯誤 listener.Close() } }() logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) // 在出現未知錯誤或panic後保證正常關閉 // 注意defer順序,先關閉 listener 再關閉應用層服務器 handler defer handler.Close() defer listener.Close() ctx, _ := context.WithCancel(context.Background()) for { conn, err := listener.Accept() if err != nil { if closing.Get() { // 收到關閉信號後進入此流程,此時listener已被監聽系統信號的 goroutine 關閉 // handler 會被上文的 defer 語句關閉直接返回 return } logger.Error(fmt.Sprintf("accept err: %v", err)) continue } // handle logger.Info("accept link") go handler.Handle(ctx, conn) } }
接下來修改應用層服務器:
// 客戶端鏈接的抽象 type Client struct { // tcp 鏈接 Conn net.Conn // 當服務端開始發送數據時進入waiting, 阻止其它goroutine關閉鏈接 // wait.Wait是做者編寫的帶有最大等待時間的封裝: // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go Waiting wait.Wait } type EchoHandler struct { // 保存全部工做狀態client的集合(把map當set用) // 需使用併發安全的容器 activeConn sync.Map // 和 tcp server 中做用相同的關閉狀態標識位 closing atomic.AtomicBool } func MakeEchoHandler()(*EchoHandler) { return &EchoHandler{ } } // 關閉客戶端鏈接 func (c *Client)Close()error { // 等待數據發送完成或超時 c.Waiting.WaitWithTimeout(10 * time.Second) c.Conn.Close() return nil } func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection conn.Close() } client := &Client { Conn: conn, } h.activeConn.Store(client, 1) reader := bufio.NewReader(conn) for { msg, err := reader.ReadString('\n') if err != nil { if err == io.EOF { logger.Info("connection close") h.activeConn.Delete(conn) } else { logger.Warn(err) } return } // 發送數據前先置爲waiting狀態 client.Waiting.Add(1) // 模擬關閉時未完成發送的狀況 //logger.Info("sleeping") //time.Sleep(10 * time.Second) b := []byte(msg) conn.Write(b) // 發送完畢, 結束waiting client.Waiting.Done() } } func (h *EchoHandler)Close()error { logger.Info("handler shuting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{})bool { client := key.(*Client) client.Close() return true }) return nil }