所謂網關,其實就是維持玩家客戶端的鏈接,將玩家發的遊戲請求轉發到具體後端服務的服務器,具備如下幾個功能點:git
對於http請求來講,micro框架自己已經實現了api網關,能夠參閱以前的博客github
牌類遊戲使用微服務重構筆記(二): micro框架簡介:micro toolkitgolang
可是對於遊戲服務器,通常都是須要長連接的,須要咱們本身實現web
網關自己應該是支持多協議的,這裏就以websocket舉例說明我重構過程當中的思路,其餘協議相似。首先選擇提供websocket鏈接的庫 推薦使用melody,基於websocket庫,語法很是簡單,數行代碼便可實現websocket服務器。咱們的遊戲須要websocket網關的緣由在於客戶端不支持HTTP2,不能和grpc服務器直連後端
package main
import (
"github.com/micro/go-web"
"gopkg.in/olahol/melody.v1"
"log"
"net/http"
)
func main() {
// New web service
service := web.NewService(web.Name("go.micro.api.gateway"))
// parse command line
service.Init()
// new melody
m := melody.New()
// Handle websocket connection
service.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_ = m.HandleRequest(w, r)
})
// handle connection with new session
m.HandleConnect(func(session *melody.Session) {
})
// handle disconnection
m.HandleDisconnect(func(session *melody.Session) {
})
// handle message
m.HandleMessage(func(session *melody.Session, bytes []byte) {
})
// run service
if err := service.Run(); err != nil {
log.Fatal("Run: ", err)
}
}
複製代碼
網關能夠收取或發送數據,而且數據結構比較統一都是[]byte
,這一點是否是很像grpc stream
,所以就可使用protobuf
的oneof
特性來定義請求和響應,可參照上期博客api
牌類遊戲使用微服務重構筆記(六): protobuf爬坑bash
定義gateway.basic.proto
,對網關收/發的消息進行歸類服務器
message Message {
oneof message {
Req req = 1; // 客戶端請求 要求響應
Rsp rsp = 2; // 服務端響應
Notify notify = 3; // 客戶端推送 不要求響應
Event event = 4; // 服務端推送
Stream stream = 5; // 雙向流請求
Ping ping = 6; // ping
Pong pong = 7;// pong
}
}
複製代碼
對於req
、notify
都是客戶端的無狀態請求,對應後端的無狀態服務器,這裏僅須要實現本身的路由規則便可,好比微信
message Req {
string serviceName = 1; // 服務名
string method = 2; // 方法名
bytes args = 3; // 參數
google.protobuf.Timestamp timestamp = 4; // 時間戳
...
}
複製代碼
req-rsp
思路與micro toolkit
的api網關相似(rpc 處理器),比較簡單,可參照以前的博客。websocket
咱們的項目對於此類請求都走http了,並無經過這個網關, 僅有一些基本的req
,好比authReq
處理session
認證。主要考慮是http簡單、無狀態、好維護,再加上此類業務對實時性要求也不高。
遊戲服務器通常都是有狀態的、雙向的、實時性要求較高,req-rsp
模式並不適合,就須要網關進行轉發。每添加一種grpc後端服務器,僅須要在oneof
中添加一個stream來拓展
message Stream {
oneof stream {
room.basic.Message roomMessage = 1; // 房間服務器
game.basic.Message gameMessage = 2; // 遊戲服務器
mate.basic.Message mateMessage = 3; // 匹配服務器
}
}
複製代碼
而且須要定義一個對應的路由請求,來處理轉發到哪一臺後端服務器上(實現不一樣也能夠不須要),這裏會涉及到一點業務,例如
message JoinRoomStreamReq {
room.basic.RoomType roomType = 1;
string roomNo = 2;
}
複製代碼
這裏根據客戶端的路由請求的房間號和房間類型,網關來選擇正確的房間服務器(甚至可能連接到舊版本的房間服務器上)
選擇正確的服務器後,創建stream 雙向流
address := "xxxxxxx" // 選擇後的服務器地址
ctx := context.Background() // 頂層context
m := make(map[string]string) // some metadata
streamCtx, cancelFunc := context.WithCancel(ctx) // 複製一個子context
// 創建stream 雙向流
stream, err := xxxClient.Stream(metadata.NewContext(streamCtx, m), client.WithAddress(address))
// 存儲在session上
session.Set("stream", stream)
session.Set("cancelFunc", cancelFunc)
// 啓動一個goroutine 收取stream消息並轉發
go func(c context.Context, s pb.xxxxxStream) {
// 退出時關閉 stream
defer func() {
session.Set("stream", nil)
session.Set("cancelFunc", nil)
if err := s.Close(); err != nil {
// do something with close err
}
}()
for {
select {
case <-c.Done():
// do something with ctx cancel
return
default:
res, err := s.Recv()
if err != nil {
// do something with recv err
return
}
// send to session 這裏能夠經過oneof包裝告知客戶端是哪一個stream發來的消息
...
}
}
}(streamCtx, stream)
複製代碼
轉發就比較簡單了,直接上代碼
對於某個stream的請求
message Stream {
oneof stream {
room.basic.Message roomMessage = 1; // 房間服務器
game.basic.Message gameMessage = 2; // 遊戲服務器
mate.basic.Message mateMessage = 3; // 匹配服務器
}
}
複製代碼
添加轉發代碼
s, exits := session.Get("stream")
if !exits {
return
}
if stream, ok := s.(pb.xxxxStream); ok {
err := stream.Send(message)
if err != nil {
log.Println("send err:", err)
return
}
}
複製代碼
當須要關閉某個stream時, 只須要調用對應的cancelFunc
便可
if v, e := session.Get("cancelFunc"); e {
if c, ok := v.(context.CancelFunc); ok {
c()
}
}
複製代碼
因爲接收請求的入口統一,使用oneof
就能夠一路switch case
,每添加一個req
或者一種stream
只須要添加一個case, 代碼看起來仍是比較簡單、清爽的
func HandleMessageBinary(session *melody.Session, bytes []byte) {
var msg pb.Message
if err := proto.Unmarshal(bytes, &msg); err != nil {
// do something
return
}
defer func() {
err := recover()
if err != nil {
// do something with panic
}
}()
switch x := msg.Message.(type) {
case *pb.Message_Req:
handleReq(session, x.Req)
case *pb.Message_Stream:
handleStream(session, x.Stream)
case *pb.Message_Ping:
handlePing(session, x.Ping)
default:
log.Println("unknown req type")
}
}
func handleStream(session *melody.Session, message *pb.Stream) {
switch x := message.Stream.(type) {
case *pb.Stream_RoomMessage:
handleRoomStream(session, x.RoomMessage)
case *pb.Stream_GameMessage:
handleGameStream(session, x.GameMessage)
case *pb.Stream_MateMessage:
handleMateStream(session, x.MateMessage)
}
}
複製代碼
對於遊戲熱更新不停服仍是挺重要的,個人思路將會在以後的博客裏介紹,能夠關注一波 嘿嘿
pprof
觀測會發現goroutine
和內存都在緩慢增加,也就是存在goroutine leak!
,緣由在於 micro源碼在包裝grpc時,沒有對關閉stream完善,只有收到io.EOF
的錯誤時纔會關閉grpc的conn鏈接func (g *grpcStream) Recv(msg interface{}) (err error) {
defer g.setError(err)
if err = g.stream.RecvMsg(msg); err != nil {
if err == io.EOF {
// #202 - inconsistent gRPC stream behavior
// the only way to tell if the stream is done is when we get a EOF on the Recv
// here we should close the underlying gRPC ClientConn
closeErr := g.conn.Close()
if closeErr != nil {
err = closeErr
}
}
}
return
}
複製代碼
而且有一個TODO
// Close the gRPC send stream
// #202 - inconsistent gRPC stream behavior
// The underlying gRPC stream should not be closed here since the
// stream should still be able to receive after this function call
// TODO: should the conn be closed in another way?
func (g *grpcStream) Close() error {
return g.stream.CloseSend()
}
複製代碼
解決方法也比較簡單,本身fork一份源碼改一下關閉stream的時候同時關閉conn(咱們的業務是能夠的由於在grpc stream客戶端和服務端均實現收到err後關閉stream),或者等做者更新用更科學的方式關閉
get
和set
數據時會發生map的讀寫競爭而panic,能夠查看issue,解決方法也比較簡單本人學習golang、micro、k8s、grpc、protobuf等知識的時間較短,若是有理解錯誤的地方,歡迎批評指正,能夠加我微信一塊兒探討學習