牌類遊戲使用微服務重構筆記(八): 遊戲網關服務器

網關服務器

所謂網關,其實就是維持玩家客戶端的鏈接,將玩家發的遊戲請求轉發到具體後端服務的服務器,具備如下幾個功能點:git

  • 長期運行,必須具備較高的穩定性和性能
  • 對外開放,即客戶端須要知道網關的IP和端口,才能鏈接上來
  • 多協議支持
  • 統一入口,架構中可能存在不少後端服務,若是沒有一個統一入口,則客戶端須要知道每一個後端服務的IP和端口
  • 請求轉發,因爲統一了入口,因此網關必須能將客戶端的請求轉發到準確的服務上,須要提供路由
  • 無感更新,因爲玩家鏈接的是網關服務器,只要鏈接不斷;更新後端服務器對玩家來講是無感知的,或者感知不多(根據實現方式不一樣)
  • 業務無關(對於遊戲服務器網關不可避免的可能會有一點業務)

對於http請求來講,micro框架自己已經實現了api網關,能夠參閱以前的博客github

牌類遊戲使用微服務重構筆記(二): micro框架簡介:micro toolkitgolang

可是對於遊戲服務器,通常都是須要長連接的,須要咱們本身實現web

鏈接協議

網關自己應該是支持多協議的,這裏就以websocket舉例說明我重構過程當中的思路,其餘協議相似。首先選擇提供websocket鏈接的庫 推薦使用melody,基於websocket庫,語法很是簡單,數行代碼便可實現websocket服務器。咱們的遊戲須要websocket網關的緣由在於客戶端不支持HTTP2,不能和grpc服務器直連後端

package main

import (
	"github.com/micro/go-web"
	"gopkg.in/olahol/melody.v1"
	"log"
	"net/http"
)

func main() {
	// New web service
	service := web.NewService(web.Name("go.micro.api.gateway"))

	// parse command line
	service.Init()

	// new melody
	m := melody.New()

	// Handle websocket connection
	service.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		_ = m.HandleRequest(w, r)
	})
	
	// handle connection with new session
	m.HandleConnect(func(session *melody.Session) {
		
	})
	
	// handle disconnection
	m.HandleDisconnect(func(session *melody.Session) {
		
	})
	
	// handle message
	m.HandleMessage(func(session *melody.Session, bytes []byte) {
		
	})

	// run service
	if err := service.Run(); err != nil {
		log.Fatal("Run: ", err)
	}
}
複製代碼

請求轉發

網關能夠收取或發送數據,而且數據結構比較統一都是[]byte,這一點是否是很像grpc stream,所以就可使用protobufoneof特性來定義請求和響應,可參照上期博客api

牌類遊戲使用微服務重構筆記(六): protobuf爬坑bash

定義gateway.basic.proto,對網關收/發的消息進行歸類服務器

message Message {
    oneof message {
        Req req = 1; // 客戶端請求 要求響應
        Rsp rsp = 2; // 服務端響應
        Notify notify = 3; // 客戶端推送 不要求響應
        Event event = 4; // 服務端推送
        Stream stream = 5; // 雙向流請求
        Ping ping = 6; // ping
        Pong pong = 7;// pong
    }
}
複製代碼

對於reqnotify都是客戶端的無狀態請求,對應後端的無狀態服務器,這裏僅須要實現本身的路由規則便可,好比微信

message Req {
    string serviceName = 1; // 服務名
    string method = 2; // 方法名
    bytes args = 3; // 參數
    google.protobuf.Timestamp timestamp = 4; // 時間戳
    ...
}
複製代碼
  • serviceName 調用rpc服務器的服務名
  • method 調用rpc服務器的方法名
  • args 調用參數
  • timestamp 請求時間戳,用於客戶端對服務端響應作匹配識別,模擬http請求req-rsp

思路與micro toolkit的api網關相似(rpc 處理器),比較簡單,可參照以前的博客。websocket

咱們的項目對於此類請求都走http了,並無經過這個網關, 僅有一些基本的req,好比authReq處理session認證。主要考慮是http簡單、無狀態、好維護,再加上此類業務對實時性要求也不高。

grpc stream轉發

遊戲服務器通常都是有狀態的、雙向的、實時性要求較高,req-rsp模式並不適合,就須要網關進行轉發。每添加一種grpc後端服務器,僅須要在oneof中添加一個stream來拓展

message Stream {
   oneof stream {
    room.basic.Message roomMessage = 1; // 房間服務器
    game.basic.Message gameMessage = 2; // 遊戲服務器
    mate.basic.Message mateMessage = 3; // 匹配服務器
   }
}
複製代碼

而且須要定義一個對應的路由請求,來處理轉發到哪一臺後端服務器上(實現不一樣也能夠不須要),這裏會涉及到一點業務,例如

message JoinRoomStreamReq {
    room.basic.RoomType roomType = 1;
    string roomNo = 2;
}
複製代碼

這裏根據客戶端的路由請求的房間號和房間類型,網關來選擇正確的房間服務器(甚至可能連接到舊版本的房間服務器上)

選擇正確的服務器後,創建stream 雙向流

address := "xxxxxxx" // 選擇後的服務器地址
ctx := context.Background() // 頂層context
m := make(map[string]string) // some metadata
streamCtx, cancelFunc := context.WithCancel(ctx) // 複製一個子context

// 創建stream 雙向流
stream, err := xxxClient.Stream(metadata.NewContext(streamCtx, m), client.WithAddress(address))

// 存儲在session上
session.Set("stream", stream)
session.Set("cancelFunc", cancelFunc)

// 啓動一個goroutine 收取stream消息並轉發
go func(c context.Context, s pb.xxxxxStream) {
    // 退出時關閉 stream
    defer func() {
	session.Set("stream", nil)
	session.Set("cancelFunc", nil)

	if err := s.Close(); err != nil {
		// do something with close err
	}
    }()

    for {
	select {
    	case <-c.Done():
    		// do something with ctx cancel
    		return
    
    	default:
    		res, err := s.Recv()
    		if err != nil {
    			// do something with recv err
    			return
    		}
    
    		// send to session 這裏能夠經過oneof包裝告知客戶端是哪一個stream發來的消息
            ...
    	}
    }
}(streamCtx, stream)
複製代碼

轉發就比較簡單了,直接上代碼

對於某個stream的請求

message Stream {
   oneof stream {
    room.basic.Message roomMessage = 1; // 房間服務器
    game.basic.Message gameMessage = 2; // 遊戲服務器
    mate.basic.Message mateMessage = 3; // 匹配服務器
   }
}
複製代碼

添加轉發代碼

s, exits := session.Get("stream")
if !exits {
    return
}

if stream, ok := s.(pb.xxxxStream); ok {
    err := stream.Send(message)
    if err != nil {
    	log.Println("send err:", err)
    	return
    }
}
複製代碼

當須要關閉某個stream時, 只須要調用對應的cancelFunc便可

if v, e := session.Get("cancelFunc"); e {
    if c, ok := v.(context.CancelFunc); ok {
    	c()
    }
}
複製代碼

使用oneOf的好處

因爲接收請求的入口統一,使用oneof就能夠一路switch case,每添加一個req或者一種stream只須要添加一個case, 代碼看起來仍是比較簡單、清爽的

func HandleMessageBinary(session *melody.Session, bytes []byte) {
    var msg pb.Message

    if err := proto.Unmarshal(bytes, &msg); err != nil {
    	// do something
    	return
    }
    
    defer func() {
	err := recover()
	if err != nil {
	    // do something with panic
	}
    }()
    
    switch x := msg.Message.(type) {
    case *pb.Message_Req:
    	handleReq(session, x.Req)
    
    case *pb.Message_Stream:
    	handleStream(session, x.Stream)
    
    case *pb.Message_Ping:
    	handlePing(session, x.Ping)
    
    default:
    	log.Println("unknown req type")
    }
}

func handleStream(session *melody.Session, message *pb.Stream) {
    switch x := message.Stream.(type) {
    case *pb.Stream_RoomMessage:
        handleRoomStream(session, x.RoomMessage)
    
    case *pb.Stream_GameMessage:
        handleGameStream(session, x.GameMessage)
    
    case *pb.Stream_MateMessage:
        handleMateStream(session, x.MateMessage)
    }
}
複製代碼

熱更新

對於遊戲熱更新不停服仍是挺重要的,個人思路將會在以後的博客裏介紹,能夠關注一波 嘿嘿

坑!

  • 這樣的網關,看似沒什麼問題,然而跑上一段時間使用pprof觀測會發現goroutine和內存都在緩慢增加,也就是存在goroutine leak!,緣由在於 micro源碼在包裝grpc時,沒有對關閉stream完善,只有收到io.EOF的錯誤時纔會關閉grpc的conn鏈接
func (g *grpcStream) Recv(msg interface{}) (err error) {
    defer g.setError(err)
    if err = g.stream.RecvMsg(msg); err != nil {
    	if err == io.EOF {
            // #202 - inconsistent gRPC stream behavior
            // the only way to tell if the stream is done is when we get a EOF on the Recv
            // here we should close the underlying gRPC ClientConn
            closeErr := g.conn.Close()
            if closeErr != nil {
                err = closeErr
            }
    	}
    }
    return
}
複製代碼

而且有一個TODO

// Close the gRPC send stream
// #202 - inconsistent gRPC stream behavior
// The underlying gRPC stream should not be closed here since the
// stream should still be able to receive after this function call
// TODO: should the conn be closed in another way?
func (g *grpcStream) Close() error {
    return g.stream.CloseSend()
}
複製代碼

解決方法也比較簡單,本身fork一份源碼改一下關閉stream的時候同時關閉conn(咱們的業務是能夠的由於在grpc stream客戶端和服務端均實現收到err後關閉stream),或者等做者更新用更科學的方式關閉

  • melody的session在getset數據時會發生map的讀寫競爭而panic,能夠查看issue,解決方法也比較簡單

一塊兒學習

本人學習golang、micro、k8s、grpc、protobuf等知識的時間較短,若是有理解錯誤的地方,歡迎批評指正,能夠加我微信一塊兒探討學習

相關文章
相關標籤/搜索