使用反射實現簡易的 RPC 框架

原文:create-simple-golang-rpc
代碼:GitHubgit

背景

微服務架構下數據交互通常是對內 RPC,對外 REST,拿筆者所在的社交 App 後端業務舉例:用戶註冊時客戶端會帶上輸入的手機號請求 API 層,API 將手機號傳遞給短信微服務,短信微服務再調用阿里大魚的短信接口,下發驗證碼。github

image-20180911184600689

其實短信發送的業務徹底能夠放到 API 層直接作,session 和 profile 的業務同理。但這麼作有 3 個缺點:golang

  • 部署效率低:若是加上 websocket(保持與客戶端長鏈接)、goexif(用戶頭像解碼)... 等各類第三方依賴,API 項目下的 vendor/ 將會變得臃腫,上輒幾百 MB,每次編譯、部署和測試過程都須要大量時間等待。
  • 開發成本高:當業務繁雜模塊較多時,每一個模塊添加新功能或 fix bug 都要從新完整發布 API 項目,從新測試,測試不經過還得從新發布。
  • 系統可用性差:全部模塊功能都編譯到一個可執行文件中,若某一模塊代碼出現問題,將可能致使整個 API 項目掛掉,全部服務不可用。好比在用戶位置模塊中有經緯度轉城市的功能,須要調用高德地圖的 API,使用 gopool 庫批量併發的去請求轉換,忘記調用 batch.QueueComplete() 結果致使 pool 中 goroutine 的數量只增不減,可能拖垮整個 API 項目。

將業務按功能模塊拆分到各個微服務,具備提升項目協做效率、下降模塊耦合度、提升系統可用性等優勢,可是開發門檻比較高,好比 RPC 框架的使用、後期的服務監控等工做。web

本文實現一個極簡的 RPC 框架,完成 Client 遠程調用 Server 的核心功能,姑且不考慮超時重連、心跳保活等網絡層機制。數據庫

本地調用

在程序中,經常將代碼段封裝成函數執行。如:後端

package main

import "fmt"

type User struct {
    Name string
    Age  int
}

func main() {
    u, err := queryUser(6)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("name: %s, age: %d\n", u.Name, u.Age)
}

// 模擬數據庫查詢
func queryUser(uid int) (User, error) {
    userDB := make(map[int]User)
    userDB[0] = User{"Dennis", 70}
    userDB[1] = User{"Ken", 75}
    userDB[2] = User{"Rob", 62}
    if u, ok := userDB[uid]; ok {
        return u, nil
    }
    return User{}, fmt.Errorf("id %d not in user db", uid)
}

函數 queryUser() 在本地代碼庫中直接調用,就能查詢到想要的用戶信息。websocket

RPC 調用

現將模擬的用戶數據做爲單獨的服務運行,客戶端經過網絡實現調用。大體流程圖以下:網絡

2

注:client 和 server 能夠是兩臺不一樣 IP 的主機,也能夠是本機上兩個端口不一樣的程序。session

如上圖,實現調用的前提是 server 能解析請求數據,client 能解析響應數據,即兩端要約定好數據包的格式。架構

網絡傳輸數據格式

成熟的 RPC 框架會有自定義 TLV 協議(固定長度消息頭 + 變長消息體)等。在 simple_rpc 中儘可能簡化,包的格式以下:

image-20180912092854182

讀取網絡字節流時,須要知道要讀取多少字節做爲的數據部分,故在頭部中使用 4 字節長的 header 部分來標識 data 的長度。讀寫以下:

package simple_rpc

import (
    "encoding/binary"
    "io"
    "net"
)

type Session struct {
    conn net.Conn
}

// 向鏈接中寫數據
func (s *Session) Write(data []byte) error {
    buf := make([]byte, 4+len(data))                       // 4 字節頭部 + 數據長度
    binary.BigEndian.PutUint32(buf[:4], uint32(len(data))) // 寫入頭部
    copy(buf[4:], data)                                    // 寫入數據
    _, err := s.conn.Write(buf)
    if err != nil {
        return err
    }
    return nil
}

// 從鏈接中讀數據
func (s *Session) Read() ([]byte, error) {
    header := make([]byte, 4)
    _, err := io.ReadFull(s.conn, header)
    if err != nil {
        return nil, err
    }
    dataLen := binary.BigEndian.Uint32(header)
    data := make([]byte, dataLen)
    _, err = io.ReadFull(s.conn, data)
    if err != nil {
        return nil, err
    }
    return data, nil
}

注:binary 包只認固定長度的類型,故 header 使用 uint32 而非 int

func TestSession_ReadWrite(t *testing.T) {
    addr := "0.0.0.0:2333"
    cont := "yep"
    wg := sync.WaitGroup{}
    wg.Add(2)
    go func() {
        defer wg.Done()
        l, err := net.Listen("tcp", addr)
        if err != nil {
            t.Fatal(err)
        }
        conn, _ := l.Accept()
        s := Session{conn: conn}
        err = s.Write([]byte(cont))
        if err != nil {
            t.Fatal(err)
        }
    }()

    go func() {
        defer wg.Done()
        conn, err := net.Dial("tcp", addr)
        if err != nil {
            t.Fatal(err)
        }
        s := Session{conn: conn}
        data, err := s.Read()
        if err != nil {
            t.Fatal(err)
        }
        if string(data) != cont {
            t.FailNow()
        }
    }()

    wg.Wait()
}

測試讀寫正常:

image-20180912094311165

反射與 RPC

server 端接收到的數據須要包括:調用的函數名、參數列表。通常咱們會約定第二個返回值是 error 類型,表示 RPC 調用結結果(gRPC 標準)

Call 執行調用

RPC Server 需解決 2 個問題:

  • Client 調用時只傳過來函數名,須要維護函數名到函數之間的 map,才能知道 Client 想要執行什麼函數
  • 從 reflect.Value 到函數調用,使用 Value.Call() 函數
package main

import (
    "fmt"
    "reflect"
)

func main() {
    funcs := make(map[string]reflect.Value) // server 端維護 funcName => func 的 map
    funcs["incr"] = reflect.ValueOf(incr)
    args := []reflect.Value{reflect.ValueOf(1)} // 構建參數(client 傳遞上來)
    vals := funcs["incr"].Call(args)            // 調用執行
    var res []interface{}
    for _, val := range vals {
        res = append(res, val.Interface()) // 處理返回值
    }
    fmt.Println(res)    // [2, <nil>]
}

func incr(n int) (int, error) {
    return n + 1, nil
}

看到這裏,RPC Server 端的核心工做以下:

  • 維護函數名到函數反射值的 map
  • client 端傳遞函數名、參數列表後,解析爲反射值,調用執行
  • 函數的返回值打包經過網絡返回給客戶端

MakeFunc 生成調用

RPC Client 需解決問題:函數的具體實如今 Server 端,Client 只有該函數的原型。使用 MakeFunc() 完成原型到函數的調用。

package main

import (
    "fmt"
    "reflect"
)

func main() {
    swap := func(args []reflect.Value) []reflect.Value {
        return []reflect.Value{args[1], args[0]}
    }

    var intSwap func(int, int) (int, int)
    fn := reflect.ValueOf(&intSwap).Elem() // 獲取 intSwap 未初始化的函數原型
    v := reflect.MakeFunc(fn.Type(), swap) // MakeFunc 使用傳入的函數原型建立一個綁定 swap 的新函數
    fn.Set(v)                              // 爲函數 intSwap 賦值

    fmt.Println(intSwap(1, 2)) // 2 1
}

RPC 數據

咱們定義 RPC 交互的數據格式,即要存儲到上邊網絡字節流中 data 部分的數據:

type RPCData struct {
    Name string
    Args []interface{}
}

定義其對應的編碼解碼函數:

func encode(data RPCData) ([]byte, error) {
    var buf bytes.Buffer
    bufEnc := gob.NewEncoder(&buf)
    if err := bufEnc.Encode(data); err != nil {
        return nil, err
    }
    return buf.Bytes(), nil
}

func decode(b []byte) (RPCData, error) {
    buf := bytes.NewBuffer(b)
    bufDec := gob.NewDecoder(buf)
    var data RPCData
    if err := bufDec.Decode(&data); err != nil {
        return data, err
    }
    return data, nil
}

Server 端

結構

server 端須要維護鏈接與 RPC 函數名到 RPC 函數自己的映射,結構以下:

type Server struct {
    addr  string
    funcs map[string]reflect.Value
}

註冊函數

將函數名與函數的真正實現對應起來:

func (s *Server) Register(rpcName string, f interface{}) {
    if _, ok := s.funcs[rpcName]; ok {
        return
    }
    fVal := reflect.ValueOf(f)
    s.funcs[rpcName] = fVal
}

執行調用

爲了看清楚服務端的工做流程,暫且忽略錯誤處理:

// 等待
func (s *Server) Run() {
    l, _ := net.Listen("tcp", s.addr)
    for {
        conn, _ := l.Accept()
        srvSession := NewSession(conn)

        // 讀取 RPC 調用數據
        b, _ := srvSession.Read()

        // 解碼 RPC 調用數據
        rpcData, _ := decode(b)

        f, ok := s.funcs[rpcData.Name]
        if !ok {
            fmt.Printf("func %s not exists", rpcData.Name)
            return
        }
        
        // 構造函數的參數
        inArgs := make([]reflect.Value, 0, len(rpcData.Args))
        for _, arg := range rpcData.Args {
            inArgs = append(inArgs, reflect.ValueOf(arg))
        }

        // 執行調用
        out := f.Call(inArgs)
        outArgs := make([]interface{}, 0, len(out))
        for _, o := range out {
            outArgs = append(outArgs, o.Interface())
        }

        // 包裝數據返回給客戶端
        respRPCData := RPCData{rpcData.Name, outArgs}
        respBytes, _ := encode(respRPCData)
        srvSession.Write(respBytes)
    }
}

Client 端

直接調用便可:

// fPtr 指向函數原型
func (c *Client) callRPC(rpcName string, fPtr interface{}) {
    fn := reflect.ValueOf(fPtr).Elem()

    // 完成與 Server 的交互
    f := func(args []reflect.Value) []reflect.Value {
        // 處理輸入參數
        inArgs := make([]interface{}, 0, len(args))
        for _, arg := range args {
            inArgs = append(inArgs, arg.Interface())
        }

        // 編碼 RPC 數據並請求
        cliSession := NewSession(c.conn)
        reqRPC := RPCData{Name: rpcName, Args: inArgs}    
        b, _ := encode(reqRPC)
        cliSession.Write(b)
        
        // 解碼響應數據,獲得返回參數
        respBytes, _ := cliSession.Read()
        respRPC, _ := decode(respBytes)

        outArgs := make([]reflect.Value, 0, len(respRPC.Args))
        for i, arg := range respRPC.Args {
            // 必須進行 nil 轉換
            if arg == nil {
                outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i)))
                continue
            }
            outArgs = append(outArgs, reflect.ValueOf(arg))
        }
        return outArgs
    }
    v := reflect.MakeFunc(fn.Type(), f)
    fn.Set(v)
}

MakeFunc 是 Client 從函數原型到網絡調用的關鍵。

測試

func TestRPC(t *testing.T) {
    gob.Register(User{})

    addr := "0.0.0.0:2333"
    srv := NewServer(addr)
    srv.Register("queryUser", queryUser)
    go srv.Run()

    conn, err := net.Dial("tcp", addr)
    if err != nil {
        t.Error(err)
    }
    cli := NewClient(conn)

    var query func(int) (User, error)
    cli.callRPC("queryUser", &query)
    
    // RPC 調用
    u, err := query(1)
    fmt.Println(err, u)
}

type User struct {
    Name string
    Age  int
}

func queryUser(uid int) (User, error) {
    userDB := make(map[int]User)
    userDB[0] = User{"Dennis", 70}
    userDB[1] = User{"Ken", 75}
    userDB[2] = User{"Rob", 62}
    if u, ok := userDB[uid]; ok {
        return u, nil
    }
    return User{}, fmt.Errorf("id %d not in user db", uid)
}

RPC 調用成功,測試經過:

1

總結

如測試文件中所示,queryUser() 沒有在 server.go 中實現,因此本文的 demo 並非徹底意義上的 RPC 框架,不過闡釋清楚了 RPC 的核心點:反射調用。

上邊的 demo 使用裸 net.Conn 進行阻塞式的讀寫。投入生產環境的 RPC 框架每每有着健壯的底層網絡機制,好比使用非阻塞式 IO 讀寫、實現 Client 與 Server 端保持超時重連、心跳檢測等等複雜的機制。

相關文章
相關標籤/搜索