用 Go 編寫一個簡單的 WebSocket 推送服務

本文中代碼能夠在 github.com/alfred-zhong/wserver 獲取。html


最近拿到需求要在網頁上展現報警信息。以往報警信息都是經過短信,微信和 App 推送給用戶的,如今要讓登陸用戶在網頁端也能實時接收到報警推送。html5

依稀記得之前工做的時候遇到過相似的需求。由於之前的瀏覽器標準比較陳舊,而且那時用 Java 較多,因此那時候解決這個問題就用了 Comet4J。具體的原理就是長輪詢,長連接。但如今畢竟 html5 流行開來了,IE 都被 Edge 接替了,再用之前這種技術就顯得過期。jquery

很早之前就聽過 WebSocket 的大名,但由於那時不少用戶的瀏覽器還不支持,因此對這個技術也就是淺嘗輒止,沒有太深刻研究過。如今趁着項目須要,就來稍微深刻了解一下。git

websocket 簡介

以往瀏覽器要獲取服務端數據,都是經過發送 HTTP 請求,而後等待服務端迴應的。也就是說瀏覽器端一直是整個請求的發起者,只有它主動,才能獲取到數據。而要讓瀏覽器一側可以獲取到服務端的實時數據,就須要不停地向服務端發起請求。雖然大多數狀況下並無獲取到實際數據,但這大大增長了網絡壓力,對於服務端來講壓力也直線上升。github

後來咱們學會了使用長鏈接 + 長輪詢的方式。換句話說,也就是延長 HTTP 請求的存在時間,儘可能保持 HTTP 鏈接。雖然這在必定程度上下降了很多壓力,但仍然須要不停地進行輪詢,也作不到真正的實時性。(借用一張圖)golang


隨着 HTML5 的到來,WebSocket 在 2011 年被定爲標準(詳情請參見 RFC 6455)。web

借用 《Go Web 編程》的話。WebSocket 採用了一些特殊的報頭,使得瀏覽器和服務器只須要作一個握手的動做,就能夠在瀏覽器和服務器之間創建一條鏈接通道。且此鏈接會保持在活動狀態,你可使用 JavaScript 來向鏈接寫入或從中接收數據,就像在使用一個常規的 TCP Socket 同樣。它解決了 Web 實時化的問題。ajax

因爲 WebSocket 是全雙工通訊,因此當創建了 WebSocket 鏈接以後,接下來的通訊就相似於傳統的 TCP 通訊了。客戶端和服務端能夠相互發送數據,再也不有實時性的問題。編程


在 Go 官方的 SDK 中,並不包含對 WebSocket 的支持,因此必須使用第三方庫。json

要使用 Golang 開發 WebSocket,選擇基本就在 x/net/websocketgorilla/websocket 之間。《Go Web 編程》一書中的例子使用了 x/net/websocket 做爲開發包,並且貌似它也更加官方且正式。而實際根據我在網上查詢獲得的反饋看來,並不是如此。x/net/websocket 貌似 Bug 較多,且較爲不穩定,問題解決也並不及時。相比之下,gorilla/websocket 則更加優秀。

還有對於 Gorilla web toolkit 組織的貢獻,必須予以感謝。🙏。其下不只有 WebSocket 的實現,也有一些其餘工具。歡迎你們使用而且可以給予反饋或貢獻。




server 啓動之後會註冊兩個 Handler。

  • websocketHandler 用於提供瀏覽器端發送 Upgrade 請求並升級爲 WebSocket 鏈接。
  • pushHandler 用於提供外部推送端發送推送數據的請求。

瀏覽器首先鏈接 websocketHandler (默認地址爲 ws://ip:port/ws)升級請求爲 WebSocket 鏈接,當鏈接創建以後須要發送註冊信息進行註冊。這裏註冊信息中包含一個 token 信息。server 會對提供的 token 進行驗證並獲取到相應的 userId(一般來講,一個 userId 可能同時關聯許多 token),並保存維護好 token, userId 和 conn(鏈接)之間的關係。

推送端發送推送數據的請求到 pushHandler(默認地址爲 ws://ip:port/push),請求中包含了 userId 字段和 message 字段。server 會根據 userId 獲取到全部此時鏈接到該 server 的 conn,而後將 message 一一進行推送。



我在此處會稍微講述一下代碼的基本構成,也順便說說 Go 語言中一些經常使用的寫法和模式(本人也是從其餘語言轉向 Go 語言,畢竟 Go 語言也至關年輕。因此有建議的話,敬請提出。)。因爲 Go 語言的發明人和一些主要維護者大都來自於 C/C++ 語言,因此 Go 語言的代碼也更偏向於 C/C++ 系。

首先先看一下 Server 的結構:

// Server defines parameters for running websocket server.
type Server struct {
    // Address for server to listen on
    Addr string

    // Path for websocket request, default "/ws".
    WSPath string

    // Path for push message, default "/push".
    PushPath string

    // Upgrader is for upgrade connection to websocket connection using
    // "github.com/gorilla/websocket".
    // If Upgrader is nil, default upgrader will be used. Default upgrader is
    // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always
    // returns true.
    Upgrader *websocket.Upgrader

    // Check token if it's valid and return userID. If token is valid, userID
    // must be returned and ok should be true. Otherwise ok should be false.
    AuthToken func(token string) (userID string, ok bool)

    // Authorize push request. Message will be sent if it returns true,
    // otherwise the request will be discarded. Default nil and push request
    // will always be accepted.
    PushAuth func(r *http.Request) bool

    wh *websocketHandler
    ph *pushHandler

PS: 因爲我整個項目的註釋都是用英文寫的,因此見諒了,但願不妨礙閱讀。

這裏說一下 Upgrader *websocket.Upgrader,這是 gorilla/websocket 包的對象,它用來升級 HTTP 請求。

若是一個結構體參數過多,一般不建議直接初始化,而是使用它提供的 New 方法。這裏是:

// NewServer creates a new Server.
func NewServer(addr string) *Server {
    return &Server{
        Addr:     addr,
        WSPath:   serverDefaultWSPath,
        PushPath: serverDefaultPushPath,

這也是 Go 語言對外提供初始化方法的一種常見用法。

而後 Server 使用 ListenAndServe 方法啓動並監聽端口,與 http 包的使用相似:

// ListenAndServe listens on the TCP network address and handle websocket
// request.
func (s *Server) ListenAndServe() error {
    b := &binder{
        userID2EventConnMap: make(map[string]*[]eventConn),
        connID2UserIDMap:    make(map[string]string),

    // websocket request handler
    wh := websocketHandler{
        upgrader: defaultUpgrader,
        binder:   b,
    if s.Upgrader != nil {
        wh.upgrader = s.Upgrader
    if s.AuthToken != nil {
        wh.calcUserIDFunc = s.AuthToken
    s.wh = &wh
    http.Handle(s.WSPath, s.wh)

    // push request handler
    ph := pushHandler{
        binder: b,
    if s.PushAuth != nil {
        ph.authFunc = s.PushAuth
    s.ph = &ph
    http.Handle(s.PushPath, s.ph)

    return http.ListenAndServe(s.Addr, nil)

這裏咱們生成了兩個 Handler,分別爲 websocketHandlerpushHandlerwebsocketHandler 負責與瀏覽器創建鏈接並傳輸數據,而 pushHandler 則處理推送端的請求。能夠看到,這裏兩個 Handler 都封裝了一個 binder 對象。這個 binder 用於維護 token <-> userID <-> Conn 的關係:

// binder is defined to store the relation of userID and eventConn
type binder struct {
    mu sync.RWMutex

    // map stores key: userID and value of related slice of eventConn
    userID2EventConnMap map[string]*[]eventConn

    // map stores key: connID and value: userID
    connID2UserIDMap map[string]string


具體看一下 websocketHandler 的實現。

// websocketHandler defines to handle websocket upgrade request.
type websocketHandler struct {
    // upgrader is used to upgrade request.
    upgrader *websocket.Upgrader

    // binder stores relations about websocket connection and userID.
    binder *binder

    // calcUserIDFunc defines to calculate userID by token. The userID will
    // be equal to token if this function is nil.
    calcUserIDFunc func(token string) (userID string, ok bool)

很簡單的結構。websocketHandler 實現了 http.Handler 接口:

// First try to upgrade connection to websocket. If success, connection will
// be kept until client send close message or server drop them.
func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    wsConn, err := wh.upgrader.Upgrade(w, r, nil)
    if err != nil {
    defer wsConn.Close()

    // handle Websocket request
    conn := NewConn(wsConn)
    conn.AfterReadFunc = func(messageType int, r io.Reader) {
        var rm RegisterMessage
        decoder := json.NewDecoder(r)
        if err := decoder.Decode(&rm); err != nil {

        // calculate userID by token
        userID := rm.Token
        if wh.calcUserIDFunc != nil {
            uID, ok := wh.calcUserIDFunc(rm.Token)
            if !ok {
            userID = uID

        // bind
        wh.binder.Bind(userID, rm.Event, conn)
    conn.BeforeCloseFunc = func() {
        // unbind


首先將傳入的 http.Request 轉換爲 websocket.Conn,再將其分裝爲咱們自定義的一個 wserver.Conn(封裝,或者說是組合,是 Go 語言的典型用法。記住,Go 語言沒有繼承,只有組合)。而後設置了 ConnAfterReadFuncBeforeCloseFunc 方法,接着啓動了 conn.Listen()AfterReadFunc 意思是當 Conn 讀取到數據後,嘗試驗證並根據 token 計算 userID,然乎 bind 註冊綁定。BeforeCloseFunc 則爲 Conn 關閉前進行解綁操做。


pushHandler 則容易理解。它解析請求而後推送數據:

// Authorize if needed. Then decode the request and push message to each
// realted websocket connection.
func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {

    // authorize
    if s.authFunc != nil {
        if ok := s.authFunc(r); !ok {

    // read request
    var pm PushMessage
    decoder := json.NewDecoder(r.Body)
    if err := decoder.Decode(&pm); err != nil {

    // validate the data
    if pm.UserID == "" || pm.Event == "" || pm.Message == "" {

    cnt, err := s.push(pm.UserID, pm.Event, pm.Message)
    if err != nil {

    result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt))
    io.Copy(w, result)


Conn (此處指 wserver.Conn) 爲 websocket.Conn 的包裝。

// Conn wraps websocket.Conn with Conn. It defines to listen and read
// data from Conn.
type Conn struct {
    Conn *websocket.Conn

    AfterReadFunc   func(messageType int, r io.Reader)
    BeforeCloseFunc func()

    once   sync.Once
    id     string
    stopCh chan struct{}

最主要的方法爲 Listen()

// Listen listens for receive data from websocket connection. It blocks
// until websocket connection is closed.
func (c *Conn) Listen() {
    c.Conn.SetCloseHandler(func(code int, text string) error {
        if c.BeforeCloseFunc != nil {

        if err := c.Close(); err != nil {

        message := websocket.FormatCloseMessage(code, "")
        c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
        return nil

    // Keeps reading from Conn util get error.
    for {
        select {
        case <-c.stopCh:
            break ReadLoop
            messageType, r, err := c.Conn.NextReader()
            if err != nil {
                // TODO: handle read error maybe
                break ReadLoop

            if c.AfterReadFunc != nil {
                c.AfterReadFunc(messageType, r)

主要設置了當 websocket 鏈接關閉時的處理和不停地讀取數據。

文中很難全面地描述整個代碼的運做流程,像具體閱讀代碼,請前往 github.com/alfred-zhong/wserver 獲取。


代碼我已經進行了必定的測試,也已經在正式環境中運行了一段時間。可是代碼可能仍然不夠穩定,因此在使用過程當中出現問題,也實屬正常。隨意隨時歡迎你們給我提 issues 或者 PRs。

