平臺開發 360雲計算 服務器
女主宣言網絡
最近,小編一直在研究RPC的原理及實現方式。在本篇文章中將經過用300行純Golang編寫簡單的RPC框架來解釋RPC。但願能幫助你們梳理RPC相關知識點。app
PS:豐富的一線技術、多元化的表現形式,盡在「360雲計算」,點關注哦!框架
我 們 通 過從頭開始在Golang中構建一個簡單的RPC框架來學習RPC基礎構成。1tcp
什麼是RPCide
簡單地說,服務A想調用服務B的函數。可是這兩個服務不在同一個內存空間中。因此不能直接調用它。函數
所以,爲了實現這個調用,咱們須要表達如何調用以及如何經過網絡傳遞通訊的語義。學習
讓咱們考慮一下,當咱們能夠在相同的內存空間(本地調用)中運行時,咱們要怎麼作。測試
type User struct {
Name string
Age int
}
var userDB = map[int]User{
1: User{"Ankur", 85},
9: User{"Anand", 25},
8: User{"Ankur Anand", 27},
}
func QueryUser(id int) (User, error) {
if u, ok := userDB[id]; ok {
return u, nil
}
return User{}, fmt.Errorf("id %d not in user db", id)
}
func main() {
u , err := QueryUser(8)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("name: %s, age: %d \n", u.Name, u.Age)
}
如今咱們如何在網絡上進行相同的函數調用ui
客戶端將經過網絡調用 QueryUser(id int) 函數,而且將有一個服務端提供對該函數的調用,並返回響應 User{"Name", id}, nil。
2
網絡傳輸數據格式
咱們將採用TLV(定長報頭+變長消息體)編碼方案來規範 tcp 上的數據傳輸。稍後會詳細介紹
在經過網絡發送數據以前,咱們須要定義如何經過網絡發送數據的結構。
這有助於咱們定義一個通用協議,客戶端和服務端均可以理解這個協議。(protobuf IDL定義了服務端和客戶端都能理解的內容)。
所以,服務端接收到的數據、要調用的函數名和參數列表,或者來自客戶端的數據都須要傳遞這些參數。
另外,讓咱們約定第二個返回值的類型爲 error,表示RPC調用結果。
// RPC數據傳輸格式
type RPCdata struct {
Name string // name of the function
Args []interface{} // request's or response's body expect error.
Err string // Error any executing remote server
}
如今咱們有了一個格式,咱們須要序列化它以便咱們能夠經過網絡發送它。在本例中,咱們將使用 go 默認的二進制序列化協議進行編碼和解碼。
// be sent over the network.
func Encode(data RPCdata) ([]byte, error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(data); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Decode the binary data into the Go struct
func Decode(b []byte) (RPCdata, error) {
buf := bytes.NewBuffer(b)
decoder := gob.NewDecoder(buf)
var data RPCdata
if err := decoder.Decode(&data); err != nil {
return Data{}, err
}
return data, nil
}
3
網絡傳輸
選擇 TLV 協議的緣由是因爲其很是容易實現,同時也完成了咱們須要識別的數據讀取的長度,由於咱們須要肯定這個請求讀取的字節數的傳入請求流。發送和接收都執行相同的操做。
// Transport will use TLV protocol
type Transport struct {
conn net.Conn // Conn is a generic stream-oriented network connection.
}
// NewTransport creates a Transport
func NewTransport(conn net.Conn) *Transport {
return &Transport{conn}
}
// Send TLV data over the network
func (t *Transport) Send(data []byte) error {
// we will need 4 more byte then the len of data
// as TLV header is 4bytes and in this header
// we will encode how much byte of data
// we are sending for this request.
buf := make([]byte, 4+len(data))
binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
copy(buf[4:], data)
_, err := t.conn.Write(buf)
if err != nil {
return err
}
return nil
}
// Read TLV sent over the wire
func (t *Transport) Read() ([]byte, error) {
header := make([]byte, 4)
_, err := io.ReadFull(t.conn, header)
if err != nil {
return nil, err
}
dataLen := binary.BigEndian.Uint32(header)
data := make([]byte, dataLen)
_, err = io.ReadFull(t.conn, data)
if err != nil {
return nil, err
}
return data, nil
}
如今咱們已經定義了數據格式和傳輸協議。下面咱們還須要RPC服務器和RPC客戶端的實現。
4
RPC服務器
RPC服務器將接收具備函數名的 RPCData。所以,咱們須要維護和映射包含函數名到實際函數映射的函數
// RPCServer ...
type RPCServer struct {
addr string
funcs map[string] reflect.Value
}
// Register the name of the function and its entries
func (s *RPCServer) Register(fnName string, fFunc interface{}) {
if _,ok := s.funcs[fnName]; ok {
return
}
s.funcs[fnName] = reflect.ValueOf(fFunc)
}
如今咱們已經註冊了 func,當咱們收到請求時,咱們將檢查函數執行期間傳遞的func的名稱是否存在。而後執行相應的操做
// Execute the given function if present
func (s *RPCServer) Execute(req RPCdata) RPCdata {
// get method by name
f, ok := s.funcs[req.Name]
if !ok {
// since method is not present
e := fmt.Sprintf("func %s not Registered", req.Name)
log.Println(e)
return RPCdata{Name: req.Name, Args: nil, Err: e}
}
log.Printf("func %s is called\n", req.Name)
// unpackage request arguments
inArgs := make([]reflect.Value, len(req.Args))
for i := range req.Args {
inArgs[i] = reflect.ValueOf(req.Args[i])
}
// invoke requested method
out := f.Call(inArgs)
// now since we have followed the function signature style where last argument will be an error
// so we will pack the response arguments expect error.
resArgs := make([]interface{}, len(out) - 1)
for i := 0; i < len(out) - 1; i ++ {
// Interface returns the constant value stored in v as an interface{}.
resArgs[i] = out[i].Interface()
}
// pack error argument
var er string
if e, ok := out[len(out) - 1].Interface().(error); ok {
// convert the error into error string value
er = e.Error()
}
return RPCdata{Name: req.Name, Args: resArgs, Err: er}
}
5
RPC客戶端
因爲函數的具體實如今服務器端,客戶端只有函數的原型,因此咱們須要調用函數的完整原型,這樣咱們才能調用它。
func (c *Client) callRPC(rpcName string, fPtr interface{}) {
container := reflect.ValueOf(fPtr).Elem()
f := func(req []reflect.Value) []reflect.Value {
cReqTransport := NewTransport(c.conn)
errorHandler := func(err error) []reflect.Value {
outArgs := make([]reflect.Value, container.Type().NumOut())
for i := 0; i < len(outArgs)-1; i++ {
outArgs[i] = reflect.Zero(container.Type().Out(i))
}
outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()
return outArgs
}
// Process input parameters
inArgs := make([]interface{}, 0, len(req))
for _, arg := range req {
inArgs = append(inArgs, arg.Interface())
}
// ReqRPC
reqRPC := RPCdata{Name: rpcName, Args: inArgs}
b, err := Encode(reqRPC)
if err != nil {
panic(err)
}
err = cReqTransport.Send(b)
if err != nil {
return errorHandler(err)
}
// receive response from server
rsp, err := cReqTransport.Read()
if err != nil { // local network error or decode error
return errorHandler(err)
}
rspDecode, _ := Decode(rsp)
if rspDecode.Err != "" { // remote server error
return errorHandler(errors.New(rspDecode.Err))
}
if len(rspDecode.Args) == 0 {
rspDecode.Args = make([]interface{}, container.Type().NumOut())
}
// unpackage response arguments
numOut := container.Type().NumOut()
outArgs := make([]reflect.Value, numOut)
for i := 0; i < numOut; i++ {
if i != numOut-1 { // unpackage arguments (except error)
if rspDecode.Args[i] == nil { // if argument is nil (gob will ignore "Zero" in transmission), set "Zero" value
outArgs[i] = reflect.Zero(container.Type().Out(i))
} else {
outArgs[i] = reflect.ValueOf(rspDecode.Args[i])
}
} else { // unpackage error argument
outArgs[i] = reflect.Zero(container.Type().Out(i))
}
}
return outArgs
}
container.Set(reflect.MakeFunc(container.Type(), f))
}
6
測試一下咱們的框架
package main
import (
"encoding/gob"
"fmt"
"net"
)
type User struct {
Name string
Age int
}
var userDB = map[int]User{
1: User{"Ankur", 85},
9: User{"Anand", 25},
8: User{"Ankur Anand", 27},
}
func QueryUser(id int) (User, error) {
if u, ok := userDB[id]; ok {
return u, nil
}
return User{}, fmt.Errorf("id %d not in user db", id)
}
func main() {
// new Type needs to be registered
gob.Register(User{})
addr := "localhost:3212"
srv := NewServer(addr)
// start server
srv.Register("QueryUser", QueryUser)
go srv.Run()
// wait for server to start.
time.Sleep(1 * time.Second)
// start client
conn, err := net.Dial("tcp", addr)
if err != nil {
panic(err)
}
cli := NewClient(conn)
var Query func(int) (User, error)
cli.callRPC("QueryUser", &Query)
u, err := Query(1)
if err != nil {
panic(err)
}
fmt.Println(u)
u2, err := Query(8)
if err != nil {
panic(err)
}
fmt.Println(u2)
}
執行:go run main.go
輸出內容
2019/07/23 20:26:18 func QueryUser is called
{Ankur 85}
2019/07/23 20:26:18 func QueryUser is called
{Ankur Anand 27}
總結
致此咱們簡單的RPC框架就實現完成了,旨在幫你們理解RPC的原理及上手簡單實踐。若是你們對這篇文章中所講內容有異議,或者想進一步討論,請留言回覆。