鑑於聊天已然成爲大部分app的基礎功能,而大部分app用戶基數有沒有辣麼大,經常使用的聊天server架構如xmpp或者消息隊列實現之類的用起來還挺麻煩的,有比較難跟網頁端作交互,加之H5標準落地,因此websocket已然成爲一個輕巧可用性高的聊天server實現方法;java
websocket的server常見的是用nodejs或者java的netty框架實現,netty相對重一點,direct buffer的內存泄露調起來比較麻煩,試了一下go,輕巧,穩定性不錯,性能不錯,因此用go實現了一下;node
websocket的協議標準和基本概念網上一搜一片,這裏不贅述;git
http server用gin來作,websocket的handler則用gorilla,因爲不重複造輪子,因此整個搭建的過程很快;github
import ( "util" "os" "fmt" "github.com/DeanThompson/ginpprof" "github.com/gin-gonic/gin" "runtime" ) var ( logger * util.LogHelper ) func main() { runtime.GOMAXPROCS(runtime.NumCPU()) logFile,err := os.OpenFile("/var/log/gows.log",os.O_CREATE|os.O_RDWR,0777) if err!=nil { fmt.Println(err.Error()) os.Exit(0) } defer logFile.Close() logger = util.NewLogger(logFile) logger.Info("Starting system...") wsHandler := new(WebSocketHandler) gin.SetMode(gin.ReleaseMode) r := gin.Default() r.GET("/", func(c *gin.Context) { wsHandler.HandleConn(c.Writer, c.Request) }) ginpprof.Wrapper(r)//調試用 能夠看到堆棧狀態和全部goroutine狀態 //err = r.Run(listenPath, certPath, keyPath) 這樣能夠支持wss err = r.Run("127.0.0.1:8888") if err != nil { fmt.Println(err) } }
這樣咱們的入口就有了~web
websocket的模式大概是 onopen onmessage onerror onclose四個callback來覆蓋整個通訊流程json
因此咱們來看下簡易版本的websockethandler的實現websocket
package main import ( "bytes" "compress/gzip" "encoding/json" "errors" "net/http" "strconv" "time" "util" "github.com/gorilla/websocket" ) var ( ctxHashMap = util.NewConcurrentMap() ) //用來升級http協議到ws協議 type WebSocketHandler struct { wsupgrader websocket.Upgrader } func (wsh *WebSocketHandler) NewWebSocketHandler() { wsh.wsupgrader = websocket.Upgrader{ ReadBufferSize: 4096, WriteBufferSize: 4096, } } func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) { //處理文本消息 或者 2進制消息 2進制一般是些 gzip的文本 語音或者圖片視頻之類的通常會用其餘雲服務否則帶寬會爆掉 if msgType == websocket.TextMessage { wsh.processIncomingTextMsg(conn, ctx, msg) } if msgType == websocket.BinaryMessage { } } func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) { if err := r.ParseForm(); err != nil { return nil, errors.New("參數校驗錯誤") } specialKey := r.FormValue("specialKey") supportGzip := r.FormValue("support_gzip") ctx = &ConnContext{specialKey, supportGzip} //用來標識一個tcp連接 keyString := ctx.AsHashKey() if oldConn, ok := ctxHashMap.Get(keyString); ok { wsh.onClose(oldConn.(*websocket.Conn), ctx) oldConn.(*websocket.Conn).Close() } ctxHashMap.Set(keyString, conn) return ctx, nil } func (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) { logger.Info("client close itself as " + ctx.String()) wsh.closeConnWithCtx(ctx) return } func (wsh *WebSocketHandler) onError(errMsg string) { logger.Error(errMsg) } func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) { wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool { return true } conn, err := wsh.wsupgrader.Upgrade(w, r, nil) if err != nil { logger.Error("Failed to set websocket upgrade: " + err.Error()) return } defer conn.Close() if ctx, err := wsh.onOpen(conn, r); err != nil { logger.Error("Open connection failed " + err.Error() + r.URL.RawQuery) return } else { conn.SetPingHandler(func(message string) error { conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second)) return nil }) for { t, msg, err := conn.ReadMessage() if err != nil { logger.Error("READ ERR FROM " + ctx.String() + " ERR " + err.Error()) wsh.onClose(conn, ctx) return } switch t { case websocket.TextMessage, websocket.BinaryMessage: wsh.onMessage(conn, ctx, msg, t) case websocket.CloseMessage: wsh.onClose(conn, ctx) return case websocket.PingMessage: case websocket.PongMessage: } } } } func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) { keyString := ctx.AsHashKey() ctxHashMap.Remove(keyString) return } func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) { logger.Debug("CLIENT SAID " + string(msg)) sendMessageToAll(msg) } func (wsh *WebSocketHandler) sendMessageToAll(msg []byte]) { var gzMsg bytes.Buffer gzWriter := gzip.NewWriter(&gzMsg) gzWriter.Write(msg) gzWriter.Flush() gzWriter.Close() for key, conn := range ctxHashMap.Items() { if ctx, err := HashKeyAsCtx(key.(string)); err != nil { wsh.onError(err.Error()) } else { if ctx.supportGzip == "1" { err = conn.(*websocket.Conn).WriteMessage(websocket.BinaryMessage, gzMsg.Bytes()) logger.Debug("send binary msg to " + ctx.String()) } else { err = conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, []byte(msg)) logger.Debug("send text msg to " + ctx.String()) } if err != nil { wsh.onClose(conn.(*websocket.Conn), ctx) conn.(*websocket.Conn).Close() wsh.onError("WRITE ERR TO " + key.(string) + " ERR:" + err.Error()) } } } }
由於刪了一些線上代碼的敏感信息 因此未必編譯的過,不過差很少一個意思,主要看氣質架構
裏面的一個莫名其妙的叫作ctx的東西出現了不少次實際上是connectionContext的縮寫,通常連接形如ws://ip:port/?param=value¶m1=value1之類的形式,固然會加密,因此在onopen的時候會對url作一次基礎校驗,而且回記錄url的一些關鍵參數標記,以用來確認消息到底要發送給誰app
一個簡單connContext實現以下框架
// connContext.go package main import ( "errors" "strings" "util" ) type ConnContext struct { specialKey string supportGzip string } func HashKeyAsCtx(hashKey string) (*ConnContext,error){ values := strings.Split(hashKey,":") if(len(values)!=2){ return nil,errors.New("艾瑪 key不對: "+hashKey) }else{ return &ConnContext{values[0],values[1]},nil } } func (ctx *ConnContext) AsHashKey() string{ return strings.Join([]string{ctx.specialKey, ctx.supportGzip},":") } func (ctx * ConnContext) String () string{ return util.NewStringBuilder("specialkey: ",ctx.specialkey, " gzip ",ctx.supportGzip).String() }
以上 一個簡易的websocket server 就這樣完成了 可喜可賀
有事兒尋這兒