本文將介紹如何實現一個基於websocket分佈式聊天(IM)系統。php
使用golang實現websocket通信,單機能夠支持百萬鏈接,使用gin框架、nginx負載、能夠水平部署、程序內部相互通信、使用grpc通信協議。html
本文內容比較長,若是直接想clone項目體驗直接進入項目體驗 goWebSocket項目下載 ,文本從介紹webSocket是什麼開始,而後開始介紹這個項目,以及在Nginx中配置域名作webSocket的轉發,而後介紹如何搭建一個分佈式系統。前端
本文將介紹如何實現一個基於websocket聊天(IM)分佈式系統。java
使用golang實現websocket通信,單機支持百萬鏈接,使用gin框架、nginx負載、能夠水平部署、程序內部相互通信、使用grpc通信協議。node
WebSocket 協議在2008年誕生,2011年成爲國際標準。全部瀏覽器都已經支持了。python
它的最大特色就是,服務器能夠主動向客戶端推送信息,客戶端也能夠主動向服務器發送信息,是真正的雙向平等對話,屬於服務器推送技術的一種。ios
HTTP和WebSocket在通信過程的比較 nginx
HTTP和webSocket都支持配置證書,ws://
無證書 wss://
配置證書的協議標識 git
golang、java、php、node.js、python、nginx 都有不錯的支持github
Android可使用java-webSocket對webSocket支持
iOS 4.2及更高版本具備WebSockets支持
目前大多數的請求都是使用HTTP,都是由客戶端發起一個請求,有服務端處理,而後返回結果,不能夠服務端主動向某一個客戶端主動發送數據
客戶端發起升級協議的請求,採用標準的HTTP報文格式,在報文中添加頭部信息
Connection: Upgrade
代表鏈接須要升級
Upgrade: websocket
須要升級到 websocket協議
Sec-WebSocket-Version: 13
協議的版本爲13
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
這個是base64 encode 的值,是瀏覽器隨機生成的,與服務器響應的 Sec-WebSocket-Accept
對應
# Request Headers
Connection: Upgrade
Host: im.91vh.com
Origin: http://im.91vh.com
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
Sec-WebSocket-Version: 13
Upgrade: websocket
複製代碼
服務端接收到升級協議的請求,若是服務端支持升級協議會作以下響應
返回:
Status Code: 101 Switching Protocols
表示支持切換協議
# Response Headers
Connection: upgrade
Date: Fri, 09 Aug 2019 07:36:59 GMT
Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E=
Server: nginx/1.12.1
Upgrade: websocket
複製代碼
golang
成功的 main
函數中用協程的方式去啓動程序go websocket.StartWebSocket()
複製代碼
// 啓動程序
func StartWebSocket() {
http.HandleFunc("/acc", wsPage)
http.ListenAndServe(":8089", nil)
}
複製代碼
func wsPage(w http.ResponseWriter, req *http.Request) {
// 升級協議
conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
fmt.Println("升級協議", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"])
return true
}}).Upgrade(w, req, nil)
if err != nil {
http.NotFound(w, req)
return
}
fmt.Println("webSocket 創建鏈接:", conn.RemoteAddr().String())
currentTime := uint64(time.Now().Unix())
client := NewClient(conn.RemoteAddr().String(), conn, currentTime)
go client.read()
go client.write()
// 用戶鏈接事件
clientManager.Register <- client
}
複製代碼
// 鏈接管理
type ClientManager struct {
Clients map[*Client]bool // 所有的鏈接
ClientsLock sync.RWMutex // 讀寫鎖
Users map[string]*Client // 登陸的用戶 // appId+uuid
UserLock sync.RWMutex // 讀寫鎖
Register chan *Client // 鏈接鏈接處理
Login chan *login // 用戶登陸處理
Unregister chan *Client // 斷開鏈接處理程序
Broadcast chan []byte // 廣播 向所有成員發送數據
}
// 初始化
func NewClientManager() (clientManager *ClientManager) {
clientManager = &ClientManager{
Clients: make(map[*Client]bool),
Users: make(map[string]*Client),
Register: make(chan *Client, 1000),
Login: make(chan *login, 1000),
Unregister: make(chan *Client, 1000),
Broadcast: make(chan []byte, 1000),
}
return
}
複製代碼
string(debug.Stack())
打印調用堆棧信息// 向客戶端寫數據
func (c *Client) write() {
defer func() {
if r := recover(); r != nil {
fmt.Println("write stop", string(debug.Stack()), r)
}
}()
defer func() {
clientManager.Unregister <- c
c.Socket.Close()
fmt.Println("Client發送數據 defer", c)
}()
for {
select {
case message, ok := <-c.Send:
if !ok {
// 發送數據錯誤 關閉鏈接
fmt.Println("Client發送數據 關閉鏈接", c.Addr, "ok", ok)
return
}
c.Socket.WriteMessage(websocket.TextMessage, message)
}
}
}
複製代碼
// 讀取客戶端數據
func (c *Client) read() {
defer func() {
if r := recover(); r != nil {
fmt.Println("write stop", string(debug.Stack()), r)
}
}()
defer func() {
fmt.Println("讀取客戶端數據 關閉send", c)
close(c.Send)
}()
for {
_, message, err := c.Socket.ReadMessage()
if err != nil {
fmt.Println("讀取客戶端數據 錯誤", c.Addr, err)
return
}
// 處理程序
fmt.Println("讀取客戶端數據 處理:", string(message))
ProcessData(c, message)
}
}
複製代碼
約定發送和接收請求數據格式,爲了js處理方便,採用了json
的數據格式發送和接收數據(人類能夠閱讀的格式在工做開發中使用是比較方便的)
登陸發送數據示例:
{"seq":"1565336219141-266129","cmd":"login","data":{"userId":"馬遠","appId":101}}
複製代碼
{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}
複製代碼
websocket是雙向的數據通信,能夠連續發送,若是發送的數據須要服務端回覆,就須要一個seq來肯定服務端的響應是回覆哪一次的請求數據
cmd 是用來肯定動做,websocket沒有相似於http的url,因此規定 cmd 是什麼動做
目前的動做有:login/heartbeat 用來發送登陸請求和鏈接保活(長時間沒有數據發送的長鏈接容易被瀏覽器、移動中間商、nginx、服務端程序斷開)
爲何須要AppId,UserId是表示用戶的惟一字段,設計的時候爲了作成通用性,設計AppId用來表示用戶在哪一個平臺登陸的(web、app、ios等),方便後續擴展
request_model.go 約定的請求數據格式
/************************ 請求數據 **************************/
// 通用請求數據格式
type Request struct {
Seq string `json:"seq"` // 消息的惟一Id
Cmd string `json:"cmd"` // 請求命令字
Data interface{} `json:"data,omitempty"` // 數據 json
}
// 登陸請求數據
type Login struct {
ServiceToken string `json:"serviceToken"` // 驗證用戶是否登陸
AppId uint32 `json:"appId,omitempty"`
UserId string `json:"userId,omitempty"`
}
// 心跳請求數據
type HeartBeat struct {
UserId string `json:"userId,omitempty"`
}
複製代碼
/************************ 響應數據 **************************/
type Head struct {
Seq string `json:"seq"` // 消息的Id
Cmd string `json:"cmd"` // 消息的cmd 動做
Response *Response `json:"response"` // 消息體
}
type Response struct {
Code uint32 `json:"code"`
CodeMsg string `json:"codeMsg"`
Data interface{} `json:"data"` // 數據 json
}
複製代碼
// Websocket 路由
func WebsocketInit() {
websocket.Register("login", websocket.LoginController)
websocket.Register("heartbeat", websocket.HeartbeatController)
}
複製代碼
client_manager.go
// 定時清理超時鏈接
func ClearTimeoutConnections() {
currentTime := uint64(time.Now().Unix())
for client := range clientManager.Clients {
if client.IsHeartbeatTimeout(currentTime) {
fmt.Println("心跳時間超時 關閉鏈接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)
client.Socket.Close()
}
}
}
複製代碼
write()
Goroutine寫入數據失敗,關閉c.Socket.Close()
鏈接,會關閉read()
Goroutine read()
Goroutine讀取數據失敗,關閉close(c.Send)
鏈接,會關閉write()
GoroutineClientManager
刪除鏈接ws = new WebSocket("ws://127.0.0.1:8089/acc");
ws.onopen = function(evt) {
console.log("Connection open ...");
};
ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
data_array = JSON.parse(evt.data);
console.log( data_array);
};
ws.onclose = function(evt) {
console.log("Connection closed.");
};
複製代碼
登陸:
ws.send('{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}');
心跳:
ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}');
ping 查看服務是否正常:
ws.send('{"seq":"2325","cmd":"ping","data":{}}');
關閉鏈接:
ws.close();
複製代碼
本項目是基於webSocket實現的分佈式IM系統
客戶端隨機分配用戶名,全部人進入一個聊天室,實現羣聊的功能
單臺機器(24核128G內存)支持百萬客戶端鏈接
支持水平部署,部署的機器之間能夠相互通信
項目架構圖
# 主要使用到的包
github.com/gin-gonic/gin@v1.4.0
github.com/go-redis/redis
github.com/gorilla/websocket
github.com/spf13/viper
google.golang.org/grpc
github.com/golang/protobuf
複製代碼
git clone git@github.com:link1st/gowebsocket.git
# 或
git clone https://github.com/link1st/gowebsocket.git
複製代碼
cd gowebsocket
cd config
mv app.yaml.example app.yaml
# 修改項目監聽端口,redis鏈接等(默認127.0.0.1:3306)
vim app.yaml
# 返回項目目錄,爲之後啓動作準備
cd ..
複製代碼
app:
logFile: log/gin.log # 日誌文件位置
httpPort: 8080 # http端口
webSocketPort: 8089 # webSocket端口
rpcPort: 9001 # 分佈式部署程序內部通信端口
httpUrl: 127.0.0.1:8080
webSocketUrl: 127.0.0.1:8089
redis:
addr: "localhost:6379"
password: ""
DB: 0
poolSize: 30
minIdleConns: 30
複製代碼
go run main.go
複製代碼
upstream go-im
{
server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
upstream go-acc
{
server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
server {
listen 80 ;
server_name im.91vh.com;
index index.html index.htm ;
location /acc {
proxy_set_header Host $host;
proxy_pass http://go-acc;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Connection "";
proxy_redirect off;
proxy_intercept_errors on;
client_max_body_size 10m;
}
location /
{
proxy_set_header Host $host;
proxy_pass http://go-im;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_redirect off;
proxy_intercept_errors on;
client_max_body_size 30m;
}
access_log /link/log/nginx/access/im.log;
error_log /link/log/nginx/access/im.error.log;
}
複製代碼
/link/server/tengine/sbin/nginx -t
複製代碼
nginx: [emerg] unknown "connection_upgrade" variable
configuration file /link/server/tengine/conf/nginx.conf test failed
複製代碼
http{
fastcgi_temp_file_write_size 128k;
..... # 須要添加的內容
#support websocket
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
.....
gzip on;
}
複製代碼
ulimit -n 1000000
複製代碼
vim /etc/sysctl.conf
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0
複製代碼
在線用戶數 | cup | 內存 | I/O | net.out |
---|---|---|---|---|
1W | ||||
10W | ||||
100W |
參考本項目源碼
爲了方便演示,IM系統和webSocket(acc)系統合併在一個系統中
IM系統接口: 獲取所有在線的用戶,查詢單前服務的所有用戶+集羣中服務的所有用戶 發送消息,這裏採用的是http接口發送(微信網頁版發送消息也是http接口),這裏考慮主要是兩點: 1.服務分離,讓acc系統儘可能的簡單一點,不摻雜其它業務邏輯 2.發送消息是走http接口,不使用webSocket鏈接,才用收和發送數據分離的方式,能夠加快收發數據的效率
# app.yaml 配置文件信息
app:
logFile: log/gin.log
httpPort: 8080
webSocketPort: 8089
rpcPort: 9001
httpUrl: im.91vh.com
webSocketUrl: im.91vh.com
# 在啓動項目
go run main.go
複製代碼
# 將第一個項目拷貝一份
cp -rf gowebsocket gowebsocket1
# app.yaml 修改配置文件
app:
logFile: log/gin.log
httpPort: 8081
webSocketPort: 8090
rpcPort: 9002
httpUrl: im.91vh.com
webSocketUrl: im.91vh.com
# 在啓動第二個項目
go run main.go
複製代碼
在以前Nginx配置項中添加第二臺機器的Ip和端口
upstream go-im
{
server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
server 127.0.0.1:8081 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
upstream go-acc
{
server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
server 127.0.0.1:8090 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
複製代碼
查看請求是否落在兩個項目上 實驗兩個用戶分別鏈接不一樣的項目(gowebsocket和gowebsocket1)是否也能夠相互發送消息
本項目只是演示了這個項目如何分佈式部署,以及分佈式部署之後模塊如何進行相互通信 徹底解決系統沒有單點的故障,還需 Nginx集羣、redis cluster等
IM實現細節:
github 搜:link1st 查看項目 gowebsocket