原文:create-simple-golang-rpc
代碼:GitHubgit
微服務架構下數據交互通常是對內 RPC,對外 REST,拿筆者所在的社交 App 後端業務舉例:用戶註冊時客戶端會帶上輸入的手機號請求 API 層,API 將手機號傳遞給短信微服務,短信微服務再調用阿里大魚的短信接口,下發驗證碼。github
其實短信發送的業務徹底能夠放到 API 層直接作,session 和 profile 的業務同理。但這麼作有 3 個缺點:golang
vendor/
將會變得臃腫,上輒幾百 MB,每次編譯、部署和測試過程都須要大量時間等待。batch.QueueComplete()
結果致使 pool 中 goroutine 的數量只增不減,可能拖垮整個 API 項目。將業務按功能模塊拆分到各個微服務,具備提升項目協做效率、下降模塊耦合度、提升系統可用性等優勢,可是開發門檻比較高,好比 RPC 框架的使用、後期的服務監控等工做。web
本文實現一個極簡的 RPC 框架,完成 Client 遠程調用 Server 的核心功能,姑且不考慮超時重連、心跳保活等網絡層機制。數據庫
在程序中,經常將代碼段封裝成函數執行。如:後端
package main import "fmt" type User struct { Name string Age int } func main() { u, err := queryUser(6) if err != nil { fmt.Println(err) return } fmt.Printf("name: %s, age: %d\n", u.Name, u.Age) } // 模擬數據庫查詢 func queryUser(uid int) (User, error) { userDB := make(map[int]User) userDB[0] = User{"Dennis", 70} userDB[1] = User{"Ken", 75} userDB[2] = User{"Rob", 62} if u, ok := userDB[uid]; ok { return u, nil } return User{}, fmt.Errorf("id %d not in user db", uid) }
函數 queryUser()
在本地代碼庫中直接調用,就能查詢到想要的用戶信息。websocket
現將模擬的用戶數據做爲單獨的服務運行,客戶端經過網絡實現調用。大體流程圖以下:網絡
注:client 和 server 能夠是兩臺不一樣 IP 的主機,也能夠是本機上兩個端口不一樣的程序。session
如上圖,實現調用的前提是 server 能解析請求數據,client 能解析響應數據,即兩端要約定好數據包的格式。架構
成熟的 RPC 框架會有自定義 TLV 協議(固定長度消息頭 + 變長消息體)等。在 simple_rpc 中儘可能簡化,包的格式以下:
讀取網絡字節流時,須要知道要讀取多少字節做爲的數據部分,故在頭部中使用 4 字節長的 header 部分來標識 data 的長度。讀寫以下:
package simple_rpc import ( "encoding/binary" "io" "net" ) type Session struct { conn net.Conn } // 向鏈接中寫數據 func (s *Session) Write(data []byte) error { buf := make([]byte, 4+len(data)) // 4 字節頭部 + 數據長度 binary.BigEndian.PutUint32(buf[:4], uint32(len(data))) // 寫入頭部 copy(buf[4:], data) // 寫入數據 _, err := s.conn.Write(buf) if err != nil { return err } return nil } // 從鏈接中讀數據 func (s *Session) Read() ([]byte, error) { header := make([]byte, 4) _, err := io.ReadFull(s.conn, header) if err != nil { return nil, err } dataLen := binary.BigEndian.Uint32(header) data := make([]byte, dataLen) _, err = io.ReadFull(s.conn, data) if err != nil { return nil, err } return data, nil }
注:binary 包只認固定長度的類型,故 header 使用 uint32 而非 int
func TestSession_ReadWrite(t *testing.T) { addr := "0.0.0.0:2333" cont := "yep" wg := sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() l, err := net.Listen("tcp", addr) if err != nil { t.Fatal(err) } conn, _ := l.Accept() s := Session{conn: conn} err = s.Write([]byte(cont)) if err != nil { t.Fatal(err) } }() go func() { defer wg.Done() conn, err := net.Dial("tcp", addr) if err != nil { t.Fatal(err) } s := Session{conn: conn} data, err := s.Read() if err != nil { t.Fatal(err) } if string(data) != cont { t.FailNow() } }() wg.Wait() }
測試讀寫正常:
server 端接收到的數據須要包括:調用的函數名、參數列表。通常咱們會約定第二個返回值是 error 類型,表示 RPC 調用結結果(gRPC 標準)
RPC Server 需解決 2 個問題:
Value.Call()
函數package main import ( "fmt" "reflect" ) func main() { funcs := make(map[string]reflect.Value) // server 端維護 funcName => func 的 map funcs["incr"] = reflect.ValueOf(incr) args := []reflect.Value{reflect.ValueOf(1)} // 構建參數(client 傳遞上來) vals := funcs["incr"].Call(args) // 調用執行 var res []interface{} for _, val := range vals { res = append(res, val.Interface()) // 處理返回值 } fmt.Println(res) // [2, <nil>] } func incr(n int) (int, error) { return n + 1, nil }
看到這裏,RPC Server 端的核心工做以下:
RPC Client 需解決問題:函數的具體實如今 Server 端,Client 只有該函數的原型。使用 MakeFunc()
完成原型到函數的調用。
package main import ( "fmt" "reflect" ) func main() { swap := func(args []reflect.Value) []reflect.Value { return []reflect.Value{args[1], args[0]} } var intSwap func(int, int) (int, int) fn := reflect.ValueOf(&intSwap).Elem() // 獲取 intSwap 未初始化的函數原型 v := reflect.MakeFunc(fn.Type(), swap) // MakeFunc 使用傳入的函數原型建立一個綁定 swap 的新函數 fn.Set(v) // 爲函數 intSwap 賦值 fmt.Println(intSwap(1, 2)) // 2 1 }
咱們定義 RPC 交互的數據格式,即要存儲到上邊網絡字節流中 data
部分的數據:
type RPCData struct { Name string Args []interface{} }
定義其對應的編碼解碼函數:
func encode(data RPCData) ([]byte, error) { var buf bytes.Buffer bufEnc := gob.NewEncoder(&buf) if err := bufEnc.Encode(data); err != nil { return nil, err } return buf.Bytes(), nil } func decode(b []byte) (RPCData, error) { buf := bytes.NewBuffer(b) bufDec := gob.NewDecoder(buf) var data RPCData if err := bufDec.Decode(&data); err != nil { return data, err } return data, nil }
server 端須要維護鏈接與 RPC 函數名到 RPC 函數自己的映射,結構以下:
type Server struct { addr string funcs map[string]reflect.Value }
將函數名與函數的真正實現對應起來:
func (s *Server) Register(rpcName string, f interface{}) { if _, ok := s.funcs[rpcName]; ok { return } fVal := reflect.ValueOf(f) s.funcs[rpcName] = fVal }
爲了看清楚服務端的工做流程,暫且忽略錯誤處理:
// 等待 func (s *Server) Run() { l, _ := net.Listen("tcp", s.addr) for { conn, _ := l.Accept() srvSession := NewSession(conn) // 讀取 RPC 調用數據 b, _ := srvSession.Read() // 解碼 RPC 調用數據 rpcData, _ := decode(b) f, ok := s.funcs[rpcData.Name] if !ok { fmt.Printf("func %s not exists", rpcData.Name) return } // 構造函數的參數 inArgs := make([]reflect.Value, 0, len(rpcData.Args)) for _, arg := range rpcData.Args { inArgs = append(inArgs, reflect.ValueOf(arg)) } // 執行調用 out := f.Call(inArgs) outArgs := make([]interface{}, 0, len(out)) for _, o := range out { outArgs = append(outArgs, o.Interface()) } // 包裝數據返回給客戶端 respRPCData := RPCData{rpcData.Name, outArgs} respBytes, _ := encode(respRPCData) srvSession.Write(respBytes) } }
直接調用便可:
// fPtr 指向函數原型 func (c *Client) callRPC(rpcName string, fPtr interface{}) { fn := reflect.ValueOf(fPtr).Elem() // 完成與 Server 的交互 f := func(args []reflect.Value) []reflect.Value { // 處理輸入參數 inArgs := make([]interface{}, 0, len(args)) for _, arg := range args { inArgs = append(inArgs, arg.Interface()) } // 編碼 RPC 數據並請求 cliSession := NewSession(c.conn) reqRPC := RPCData{Name: rpcName, Args: inArgs} b, _ := encode(reqRPC) cliSession.Write(b) // 解碼響應數據,獲得返回參數 respBytes, _ := cliSession.Read() respRPC, _ := decode(respBytes) outArgs := make([]reflect.Value, 0, len(respRPC.Args)) for i, arg := range respRPC.Args { // 必須進行 nil 轉換 if arg == nil { outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i))) continue } outArgs = append(outArgs, reflect.ValueOf(arg)) } return outArgs } v := reflect.MakeFunc(fn.Type(), f) fn.Set(v) }
MakeFunc
是 Client 從函數原型到網絡調用的關鍵。
func TestRPC(t *testing.T) { gob.Register(User{}) addr := "0.0.0.0:2333" srv := NewServer(addr) srv.Register("queryUser", queryUser) go srv.Run() conn, err := net.Dial("tcp", addr) if err != nil { t.Error(err) } cli := NewClient(conn) var query func(int) (User, error) cli.callRPC("queryUser", &query) // RPC 調用 u, err := query(1) fmt.Println(err, u) } type User struct { Name string Age int } func queryUser(uid int) (User, error) { userDB := make(map[int]User) userDB[0] = User{"Dennis", 70} userDB[1] = User{"Ken", 75} userDB[2] = User{"Rob", 62} if u, ok := userDB[uid]; ok { return u, nil } return User{}, fmt.Errorf("id %d not in user db", uid) }
RPC 調用成功,測試經過:
如測試文件中所示,queryUser()
沒有在 server.go 中實現,因此本文的 demo 並非徹底意義上的 RPC 框架,不過闡釋清楚了 RPC 的核心點:反射調用。
上邊的 demo 使用裸 net.Conn
進行阻塞式的讀寫。投入生產環境的 RPC 框架每每有着健壯的底層網絡機制,好比使用非阻塞式 IO 讀寫、實現 Client 與 Server 端保持超時重連、心跳檢測等等複雜的機制。