Go實現基於WebSocket的彈幕服務

拉模式和推模式

拉模式

一、數據更新頻率低,則大多數請求是無效的
二、在線用戶量多,則服務端的查詢負載高
三、定時輪詢拉取,實時性低git

推模式

一、僅在數據更新時才須要推送
二、須要維護大量的在線長鏈接
三、數據更新後能夠當即推送github

基於webSocket推送

一、瀏覽器支持的socket編程,輕鬆維持服務端長鏈接
二、基於TCP可靠傳輸之上的協議,無需開發者關心通信細節
三、提供了高度抽象的編程接口,業務開發成本較低web

webSocket協議與交互

通信流程

客戶端->upgrade->服務端
客戶端<-switching<-服務端
客戶端->message->服務端
客戶端<-message<-服務端編程

實現http服務端

一、webSocket是http協議upgrade而來
二、使用http標準庫快速實現空接口:/wsapi

webSocket握手

一、使用webSocket.Upgrader完成協議握手,獲得webSocket長鏈接
二、操做webSocket api,讀取客戶端消息,而後原樣發送回去跨域

封裝webSocket

缺少工程化設計

一、其餘代碼模塊,沒法直接操做webSocket鏈接
二、webSocket鏈接非線程安全,併發讀/寫須要同步手段瀏覽器

隱藏細節,封裝api

一、封裝Connection結構,隱藏webSocket底層鏈接
二、封裝Connection的api,提供Send/Read/Close等線程安全接口安全

api原理(channel是線程安全的)

一、SendMessage將消息投遞到out channel
二、ReadMessage從in channel讀取消息websocket

內部原理

一、啓動讀協程,循環讀取webSocket,將消息投遞到in channel
二、啓動寫協程,循環讀取out channel,將消息寫給webSocket併發

// server.go
package main

import (
    "net/http"
    "github.com/gorilla/websocket"
    "./impl"
    "time"
)

var (
    upgrader = websocket.Upgrader{
        //容許跨域
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
)

func wsHandler(w http.ResponseWriter, r *http.Request) {
    var (
        wsConn *websocket.Conn
        err error
        conn *impl.Connection
        data []byte
    )

    //Upgrade:websocket
    if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
        return
    }
    if conn, err = impl.InitConnection(wsConn); err != nil {
        goto ERR
    }

    go func() {
        var (
            err error
        )
        for {
            if err =conn.WriteMessage([]byte("heartbeat")); err != nil {
                return
            }
            time.Sleep(1 * time.Second)
        }
    }()

    for {
        if data, err = conn.ReadMessage(); err != nil {
            goto ERR
        }
        if err = conn.WriteMessage(data); err != nil {
            goto ERR
        }

    }

    ERR:
        //關閉鏈接
        conn.Close()
}

func main() {
    //http:localhost:7777/ws
    http.HandleFunc("/ws", wsHandler)
    http.ListenAndServe("0.0.0.0:7777", nil)
}
// connection.go
package impl

import (
    "github.com/gorilla/websocket"
    "sync"
    "github.com/influxdata/platform/kit/errors"
)

var once sync.Once

type Connection struct {
    wsConn *websocket.Conn
    inChan chan []byte
    outChan chan []byte
    closeChan chan byte
    isClosed bool
    mutex sync.Mutex
}

func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
    conn = &Connection{
        wsConn:wsConn,
        inChan:make(chan []byte, 1000),
        outChan:make(chan []byte, 1000),
        closeChan:make(chan byte, 1),
    }

    //啓動讀協程
    go conn.readLoop()

    //啓動寫協程
    go conn.writeLoop()

    return
}

//API
func (conn *Connection) ReadMessage() (data []byte, err error) {
    select {
    case data = <- conn.inChan:
    case <- conn.closeChan:
        err = errors.New("connection is closed")
    }
    return
}

func (conn *Connection) WriteMessage(data []byte) (err error) {
    select {
    case conn.outChan <- data:
    case <- conn.closeChan:
        err = errors.New("connection is closed")
    }
    return
}

func (conn *Connection) Close() {
    // 線程安全的close,可重入
    conn.wsConn.Close()
    conn.mutex.Lock()
    if !conn.isClosed {
        close(conn.closeChan)
        conn.isClosed = true
    }
    conn.mutex.Unlock()
}

//內部實現
func (conn *Connection) readLoop() {
    var (
        data []byte
        err error
    )
    for {
        if _, data, err = conn.wsConn.ReadMessage(); err != nil {
            goto ERR
        }

        //阻塞在這裏,等待inChan有空位置
        //可是若是writeLoop鏈接關閉了,這邊沒法得知
        //conn.inChan <- data

        select {
        case conn.inChan <- data:
        case <-conn.closeChan:
            //closeChan關閉的時候,會進入此分支
            goto ERR
        }
    }
    ERR:
        conn.Close()
}

func (conn *Connection) writeLoop() {
    var (
        data []byte
        err error
    )
    for {
        select {
        case data = <- conn.outChan:
        case <- conn.closeChan:
            goto ERR

        }

        if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil {
            goto ERR
        }
        conn.outChan <- data
    }
    ERR:
        conn.Close()
}
相關文章
相關標籤/搜索