Golang 編寫 Tcp 服務器

Golang 做爲普遍用於服務端和雲計算領域的編程語言,tcp socket 是其中相當重要的功能。不管是 WEB 服務器仍是各種中間件都離不開 tcp socket 的支持。git

與早期的每一個線程持有一個 socket 的 block IO 模型不一樣, 多路IO複用模型使用單個線程監聽多個 socket, 當某個 socket 準備好數據後再進行響應。在邏輯上與使用 select 語句監聽多個 channel 的模式相同。github

目前主要的多路IO複用實現主要包括: SELECT, POLL 和 EPOLL。 爲了提升開發效率社區也出現不少封裝庫, 如Netty(Java), Tornado(Python) 和 libev(C)等。編程

Golang Runtime 封裝了各操做系統平臺上的多路IO複用接口, 並容許使用 goroutine 快速開發高性能的 tcp 服務器。緩存

Echo 服務器

做爲開始,咱們來實現一個簡單的 Echo 服務器。它會接受客戶端鏈接並將客戶端發送的內容原樣傳回客戶端。安全

package main

import (
    "fmt"
    "net"
    "io"
    "log"
    "bufio"
)

func ListenAndServe(address string) {
    // 綁定監聽地址
    listener, err := net.Listen("tcp", address)
    if err != nil {
        log.Fatal(fmt.Sprintf("listen err: %v", err))
    }
    defer listener.Close()
    log.Println(fmt.Sprintf("bind: %s, start listening...", address))

    for {
        // Accept 會一直阻塞直到有新的鏈接創建或者listen中斷纔會返回
        conn, err := listener.Accept()
        if err != nil {
            // 一般是因爲listener被關閉沒法繼續監聽致使的錯誤
            log.Fatal(fmt.Sprintf("accept err: %v", err))
        }
        // 開啓新的 goroutine 處理該鏈接
        go Handle(conn)
    }
}

func Handle(conn net.Conn) {
    // 使用 bufio 標準庫提供的緩衝區功能
    reader := bufio.NewReader(conn)
    for {
        // ReadString 會一直阻塞直到遇到分隔符 '\n'
        // 遇到分隔符後會返回上次遇到分隔符或鏈接創建後收到的全部數據, 包括分隔符自己
        // 若在遇到分隔符以前遇到異常, ReadString 會返回已收到的數據和錯誤信息
        msg, err := reader.ReadString('\n')
        if err != nil {
            // 一般遇到的錯誤是鏈接中斷或被關閉,用io.EOF表示
            if err == io.EOF {
                log.Println("connection close")
            } else {
                log.Println(err)
            }
            return
        }
        b := []byte(msg)
        // 將收到的信息發送給客戶端
        conn.Write(b)
    }
}

func main() {
    ListenAndServe(":8000")
}

使用 telnet 工具測試咱們編寫的 Echo 服務器:bash

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
> a
a
> b
b
Connection closed by foreign host.

拆包與粘包

HTTP 等應用層協議只有收到一條完整的消息後才能進行處理,而工做在傳輸層的TCP協議並不瞭解應用層消息的結構。服務器

所以,可能遇到一條應用層消息分爲兩個TCP包發送或者一個TCP包中含有兩條應用層消息片斷的狀況,前者稱爲拆包後者稱爲粘包。併發

在 Echo 服務器的示例中,咱們定義用\n表示消息結束。咱們可能遇到下列幾種狀況:socket

  1. 收到兩個 tcp 包: "abc", "def\n", 應發出一條響應 "abcdef\n", 這是拆包的狀況
  2. 收到一個 tcp 包: "abc\ndef\n", 應發出兩條響應 "abc\n", "def\n", 這是粘包的狀況

當咱們使用 tcp socket 開發應用層程序時必須正確處理拆包和粘包。tcp

bufio 標準庫會緩存收到的數據直到遇到分隔符纔會返回,它能夠正確處理拆包和粘包。

上層協議一般採用下列幾種思路之一來定義消息,以保證完整地進行讀取:

  • 定長消息
  • 在消息尾部添加特殊分隔符,如示例中的Echo協議和FTP控制協議
  • 將消息分爲header 和 body, 並在 header 提供消息總長度。這是應用最普遍的策略,如HTTP協議。

優雅關閉

在生產環境下須要保證TCP服務器關閉前完成必要的清理工做,包括將完成正在進行的數據傳輸,關閉TCP鏈接等。這種關閉模式稱爲優雅關閉,能夠避免資源泄露以及客戶端未收到完整數據形成異常。

TCP 服務器的優雅關閉模式一般爲: 先關閉listener阻止新鏈接進入,而後遍歷全部鏈接逐個進行關閉。

本節完整源代碼地址: https://github.com/HDT3213/godis/tree/master/src/server

首先修改一下TCP服務器:

// handler 是應用層服務器的抽象
type Handler interface {
    Handle(ctx context.Context, conn net.Conn)
    Close()error
}

func ListenAndServe(cfg *Config, handler tcp.Handler) {
    listener, err := net.Listen("tcp", cfg.Address)
    if err != nil {
        logger.Fatal(fmt.Sprintf("listen err: %v", err))
    }

    // 監聽中斷信號
    // atomic.AtomicBool 是做者寫的封裝: https://github.com/HDT3213/godis/blob/master/src/lib/sync/atomic/bool.go
    var closing atomic.AtomicBool 
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    go func() {
        sig := <-sigCh
        switch sig {
        case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
            // 收到中斷信號後開始關閉流程
            logger.Info("shuting down...")
            // 設置標誌位爲關閉中, 使用原子操做保證線程可見性
            closing.Set(true)
            // listener 關閉後 listener.Accept() 會當即返回錯誤
            listener.Close() 
        }
    }()


    logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
    // 在出現未知錯誤或panic後保證正常關閉
    // 注意defer順序,先關閉 listener 再關閉應用層服務器 handler
    defer handler.Close()
    defer listener.Close()
    ctx, _ := context.WithCancel(context.Background())
    for {
        conn, err := listener.Accept()
        if err != nil {
            if closing.Get() {
                // 收到關閉信號後進入此流程,此時listener已被監聽系統信號的 goroutine 關閉
                // handler 會被上文的 defer 語句關閉直接返回
                return 
            }
            logger.Error(fmt.Sprintf("accept err: %v", err))
            continue
        }
        // handle
        logger.Info("accept link")
        go handler.Handle(ctx, conn)
    }
}

接下來修改應用層服務器:

// 客戶端鏈接的抽象
type Client struct {
    // tcp 鏈接
    Conn net.Conn
    // 當服務端開始發送數據時進入waiting, 阻止其它goroutine關閉鏈接
    // wait.Wait是做者編寫的帶有最大等待時間的封裝: 
    // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go
    Waiting wait.Wait
}

type EchoHandler struct {
    
    // 保存全部工做狀態client的集合(把map當set用)
    // 需使用併發安全的容器
    activeConn sync.Map 

    // 和 tcp server 中做用相同的關閉狀態標識位
    closing atomic.AtomicBool
}

func MakeEchoHandler()(*EchoHandler) {
    return &EchoHandler{
    }
}

// 關閉客戶端鏈接
func (c *Client)Close()error {
    // 等待數據發送完成或超時
    c.Waiting.WaitWithTimeout(10 * time.Second)
    c.Conn.Close()
    return nil
}

func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) {
    if h.closing.Get() {
        // closing handler refuse new connection
        conn.Close()
    }

    client := &Client {
        Conn: conn,
    }
    h.activeConn.Store(client, 1)

    reader := bufio.NewReader(conn)
    for {
        msg, err := reader.ReadString('\n')
        if err != nil {
            if err == io.EOF {
                logger.Info("connection close")
                h.activeConn.Delete(conn)
            } else {
                logger.Warn(err)
            }
            return
        }
        // 發送數據前先置爲waiting狀態
        client.Waiting.Add(1)

        // 模擬關閉時未完成發送的狀況
        //logger.Info("sleeping")
        //time.Sleep(10 * time.Second)

        b := []byte(msg)
        conn.Write(b)
        // 發送完畢, 結束waiting
        client.Waiting.Done()
    }
}

func (h *EchoHandler)Close()error {
    logger.Info("handler shuting down...")
    h.closing.Set(true)
    // TODO: concurrent wait
    h.activeConn.Range(func(key interface{}, val interface{})bool {
        client := key.(*Client)
        client.Close()
        return true
    })
    return nil
}
相關文章
相關標籤/搜索