本文將介紹如何實現一個基於websocket分佈式聊天(IM)系統。php
使用golang實現websocket通信,單機能夠支持百萬鏈接,使用gin框架、nginx負載、能夠水平部署、程序內部相互通信、使用grpc通信協議。html
本文內容比較長,若是直接想clone項目體驗直接進入項目體驗 goWebSocket項目下載 ,文本從介紹webSocket是什麼開始,而後開始介紹這個項目,以及在Nginx中配置域名作webSocket的轉發,而後介紹如何搭建一個分佈式系統。前端
一、項目說明java
二、介紹webSocketnode
三、如何實現基於webSocket的長連接系統python
五、webSocket項目Nginx配置github
本文將介紹如何實現一個基於websocket聊天(IM)分佈式系統。
使用golang實現websocket通信,單機支持百萬鏈接,使用gin框架、nginx負載、能夠水平部署、程序內部相互通信、使用grpc通信協議。
WebSocket 協議在2008年誕生,2011年成爲國際標準。全部瀏覽器都已經支持了。
它的最大特色就是,服務器能夠主動向客戶端推送信息,客戶端也能夠主動向服務器發送信息,是真正的雙向平等對話,屬於服務器推送技術的一種。
ws://
無證書 wss://
配置證書的協議標識golang、java、php、node.js、python、nginx 都有不錯的支持
Android可使用java-webSocket對webSocket支持
iOS 4.2及更高版本具備WebSockets支持
目前大多數的請求都是使用HTTP,都是由客戶端發起一個請求,有服務端處理,而後返回結果,不能夠服務端主動向某一個客戶端主動發送數據
客戶端發起升級協議的請求,採用標準的HTTP報文格式,在報文中添加頭部信息
Connection: Upgrade
代表鏈接須要升級
Upgrade: websocket
須要升級到 websocket協議
Sec-WebSocket-Version: 13
協議的版本爲13
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
這個是base64 encode 的值,是瀏覽器隨機生成的,與服務器響應的 Sec-WebSocket-Accept
對應
# Request Headers Connection: Upgrade Host: im.91vh.com Origin: http://im.91vh.com Pragma: no-cache Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA== Sec-WebSocket-Version: 13 Upgrade: websocket
服務端接收到升級協議的請求,若是服務端支持升級協議會作以下響應
返回:
Status Code: 101 Switching Protocols
表示支持切換協議
# Response Headers Connection: upgrade Date: Fri, 09 Aug 2019 07:36:59 GMT Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E= Server: nginx/1.12.1 Upgrade: websocket
golang
成功的 main
函數中用協程的方式去啓動程序go websocket.StartWebSocket()
// 啓動程序 func StartWebSocket() { http.HandleFunc("/acc", wsPage) http.ListenAndServe(":8089", nil) }
func wsPage(w http.ResponseWriter, req *http.Request) { // 升級協議 conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { fmt.Println("升級協議", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"]) return true }}).Upgrade(w, req, nil) if err != nil { http.NotFound(w, req) return } fmt.Println("webSocket 創建鏈接:", conn.RemoteAddr().String()) currentTime := uint64(time.Now().Unix()) client := NewClient(conn.RemoteAddr().String(), conn, currentTime) go client.read() go client.write() // 用戶鏈接事件 clientManager.Register <- client }
// 鏈接管理 type ClientManager struct { Clients map[*Client]bool // 所有的鏈接 ClientsLock sync.RWMutex // 讀寫鎖 Users map[string]*Client // 登陸的用戶 // appId+uuid UserLock sync.RWMutex // 讀寫鎖 Register chan *Client // 鏈接鏈接處理 Login chan *login // 用戶登陸處理 Unregister chan *Client // 斷開鏈接處理程序 Broadcast chan []byte // 廣播 向所有成員發送數據 } // 初始化 func NewClientManager() (clientManager *ClientManager) { clientManager = &ClientManager{ Clients: make(map[*Client]bool), Users: make(map[string]*Client), Register: make(chan *Client, 1000), Login: make(chan *login, 1000), Unregister: make(chan *Client, 1000), Broadcast: make(chan []byte, 1000), } return }
string(debug.Stack())
打印調用堆棧信息// 向客戶端寫數據 func (c *Client) write() { defer func() { if r := recover(); r != nil { fmt.Println("write stop", string(debug.Stack()), r) } }() defer func() { clientManager.Unregister <- c c.Socket.Close() fmt.Println("Client發送數據 defer", c) }() for { select { case message, ok := <-c.Send: if !ok { // 發送數據錯誤 關閉鏈接 fmt.Println("Client發送數據 關閉鏈接", c.Addr, "ok", ok) return } c.Socket.WriteMessage(websocket.TextMessage, message) } } }
// 讀取客戶端數據 func (c *Client) read() { defer func() { if r := recover(); r != nil { fmt.Println("write stop", string(debug.Stack()), r) } }() defer func() { fmt.Println("讀取客戶端數據 關閉send", c) close(c.Send) }() for { _, message, err := c.Socket.ReadMessage() if err != nil { fmt.Println("讀取客戶端數據 錯誤", c.Addr, err) return } // 處理程序 fmt.Println("讀取客戶端數據 處理:", string(message)) ProcessData(c, message) } }
json
的數據格式發送和接收數據(人類能夠閱讀的格式在工做開發中使用是比較方便的){"seq":"1565336219141-266129","cmd":"login","data":{"userId":"馬遠","appId":101}}
{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}
/************************ 請求數據 **************************/ // 通用請求數據格式 type Request struct { Seq string `json:"seq"` // 消息的惟一Id Cmd string `json:"cmd"` // 請求命令字 Data interface{} `json:"data,omitempty"` // 數據 json } // 登陸請求數據 type Login struct { ServiceToken string `json:"serviceToken"` // 驗證用戶是否登陸 AppId uint32 `json:"appId,omitempty"` UserId string `json:"userId,omitempty"` } // 心跳請求數據 type HeartBeat struct { UserId string `json:"userId,omitempty"` }
/************************ 響應數據 **************************/ type Head struct { Seq string `json:"seq"` // 消息的Id Cmd string `json:"cmd"` // 消息的cmd 動做 Response *Response `json:"response"` // 消息體 } type Response struct { Code uint32 `json:"code"` CodeMsg string `json:"codeMsg"` Data interface{} `json:"data"` // 數據 json }
// Websocket 路由 func WebsocketInit() { websocket.Register("login", websocket.LoginController) websocket.Register("heartbeat", websocket.HeartbeatController) }
沒有登陸的鏈接和登陸的鏈接6分鐘沒有心跳則斷開鏈接
client_manager.go
// 定時清理超時鏈接 func ClearTimeoutConnections() { currentTime := uint64(time.Now().Unix()) for client := range clientManager.Clients { if client.IsHeartbeatTimeout(currentTime) { fmt.Println("心跳時間超時 關閉鏈接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime) client.Socket.Close() } } }
write()
Goroutine寫入數據失敗,關閉c.Socket.Close()
鏈接,會關閉read()
Goroutineread()
Goroutine讀取數據失敗,關閉close(c.Send)
鏈接,會關閉write()
Goroutine
關閉讀寫的Goroutine
從ClientManager
刪除鏈接
十個內存溢出有九個和Goroutine有關
添加一個http的接口,能夠查看系統的狀態,防止Goroutine不回收
查看系統狀態
ws = new WebSocket("ws://127.0.0.1:8089/acc"); ws.onopen = function(evt) { console.log("Connection open ..."); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); data_array = JSON.parse(evt.data); console.log( data_array); }; ws.onclose = function(evt) { console.log("Connection closed."); };
登陸: ws.send('{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}'); 心跳: ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}'); 關閉鏈接: ws.close();
# 主要使用到的包 github.com/gin-gonic/gin@v1.4.0 github.com/go-redis/redis github.com/gorilla/websocket github.com/spf13/viper google.golang.org/grpc github.com/golang/protobuf
git clone git@github.com:link1st/gowebsocket.git # 或 git clone https://github.com/link1st/gowebsocket.git
cd gowebsocket cd config mv app.yaml.example app.yaml # 修改項目監聽端口,redis鏈接等(默認127.0.0.1:3306) vim app.yaml # 返回項目目錄,爲之後啓動作準備 cd ..
app: logFile: log/gin.log # 日誌文件位置 httpPort: 8080 # http端口 webSocketPort: 8089 # webSocket端口 rpcPort: 9001 # 分佈式部署程序內部通信端口 httpUrl: 127.0.0.1:8080 webSocketUrl: 127.0.0.1:8089 redis: addr: "localhost:6379" password: "" DB: 0 poolSize: 30 minIdleConns: 30
go run main.go
http://127.0.0.1:8080/home/index
upstream go-im { server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s; keepalive 16; } upstream go-acc { server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s; keepalive 16; } server { listen 80 ; server_name im.91vh.com; index index.html index.htm ; location /acc { proxy_set_header Host $host; proxy_pass http://go-acc; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Connection ""; proxy_redirect off; proxy_intercept_errors on; client_max_body_size 10m; } location / { proxy_set_header Host $host; proxy_pass http://go-im; proxy_http_version 1.1; proxy_set_header Connection ""; proxy_redirect off; proxy_intercept_errors on; client_max_body_size 30m; } access_log /link/log/nginx/access/im.log; error_log /link/log/nginx/access/im.error.log; }
/link/server/tengine/sbin/nginx -t
nginx: [emerg] unknown "connection_upgrade" variable configuration file /link/server/tengine/conf/nginx.conf test failed
http{ fastcgi_temp_file_write_size 128k; ..... # 須要添加的內容 #support websocket map $http_upgrade $connection_upgrade { default upgrade; '' close; } ..... gzip on; }
ulimit -n 1000000
vim /etc/sysctl.conf net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 0
在線用戶數 | cup | 內存 | I/O | net.out |
---|---|---|---|---|
1W | ||||
10W | ||||
100W |
獲取所有在線的用戶,查詢單前服務的所有用戶+集羣中服務的所有用戶
發送消息,這裏採用的是http接口發送(微信網頁版發送消息也是http接口),這裏考慮主要是兩點:
1.服務分離,讓acc系統儘可能的簡單一點,不摻雜其它業務邏輯
2.發送消息是走http接口,不使用webSocket鏈接,才用收和發送數據分離的方式,能夠加快收發數據的效率
IM實現細節: