go語言實現本身的RPC:go rpc codec

前言

RPC是遠程過程調用(Remote Procedure Call)的簡稱,經過RPC咱們能夠像調用本地方法同樣調用位於其餘位置的函數。你們更常見的多是HTTP API調用,簡單來對比的話,RPC比起HTTP調用封裝更完善,調用者沒必要手動處理序列化和反序列化,使用成本更低一些(雖然學習成本可能會更高)。json

出於學習目的,此次的目標是使用go語言來實現一個本身的RPC。在現實世界裏,對於一個RPC工具,除了方法調用之外,人們更看重的是其餘功能好比服務發現、負載均衡、熔斷降級之類的功能,這裏暫時不會涉及,而是僅關注實現一個能夠工做的方法調用。緩存

以前的文章裏大體瞭解了go語言自帶的rpc框架,其中就提到go rpc預留了codec接口,可讓用戶在go rpc使用本身的序列化協議,此次就嘗試實現一個本身的codec來實現本身的RPC。bash

準備工做

序列化協議

要實現一個RPC,基本的元素大概有這幾個:序列化協議、網絡模型和線程模型。而go rpc裏的codec基本上實現的就是序列化協議。網絡

原本想着用比較熟悉的thrift協議,可是使用thrift自己實現了RPC流程,因此它並非一個單純的序列化協議,它的序列化邏輯可能沒法和go rpc很好的契合,再加上還須要書寫IDL定義,增長複雜度。原本就是爲了熟悉go,因此這裏先從簡單的開始,因而選擇messagepack做爲序列化協議。併發

messagepack是一個比較輕量級的序列化協議,它的邏輯和json相似,可是使用的是二進制形式,因此比json序列化更快,序列化後產生的數據也更小,基本上能夠認爲是一個二進制版本的json。負載均衡

建立類定義

要實現本身的codec,須要分別實現go rpc中提供個兩個接口:ServerCodec和ClientCodec,很明顯他們分別表示服務端和客戶端的邏輯,兩個接口的定義具體以下:框架

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(interface{}) error
	WriteResponse(*Response, interface{}) error
	Close() error
}
type ClientCodec interface {
	WriteRequest(*Request, interface{}) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(interface{}) error
	Close() error
}
複製代碼

能夠看到,go rpc將一次請求/響應抽象成了header+body的形式,讀取數據時分爲讀取head和讀取body,寫入數據時只需寫入body部分,go rpc會替咱們加上head部分。 接下來咱們定義兩個結構,用來表示一次請求/響應的完整數據:tcp

type MsgpackReq struct {
	rpc.Request  //head
	Arg interface{} //body
}

type MsgpackResp struct {
	rpc.Response  //head
	Reply interface{}  //body
}
複製代碼

這裏的msgpackReq和msgpackResp直接內嵌了go rpc裏自帶的Request和Response,自帶的Request和Response定義了序號、方法名等信息。函數

接下來就是自定義Codec的聲明:工具

type MessagePackServerCodec struct {
	rwc    io.ReadWriteCloser //用於讀寫數據,實際是一個網絡鏈接
	req    MsgpackReq //用於緩存解析到的請求
	closed bool  //標識codec是否關閉
}

type MessagePackClientCodec struct {
	rwc    io.ReadWriteCloser
	resp   MsgpackResp  //用於緩存解析到的請求
	closed bool
}

func NewServerCodec(conn net.Conn) *MessagePackServerCodec {
	return &MessagePackServerCodec{conn, MsgpackReq{}, false}
}

func NewClientCodec(conn net.Conn) *MessagePackClientCodec {
	return &MessagePackClientCodec{conn, MsgpackResp{}, false}
}
複製代碼

在以前的文章裏提到了,codec須要包含一個數據源用於讀寫數據,這裏直接將網路鏈接傳遞進去。

實現Codec方法

實現思路

接下來是具體的方法實現,出於簡單起見,這裏將反序列化部分的兩步合併爲一步,在讀取head部分時就將全部的數據解析好並緩存起來,讀取body時直接返回緩存的結果。具體的思路就是:

  1. 客戶端在發送請求時,將數據包裝成一個MsgpackReq,而後用messagepack序列化併發送出去
  2. 服務端在讀取請求head部分時,將收到的數據用messagepack反序列化成一個MsgpackReq,並將獲得的結果緩存起來
  3. 服務端在讀取請求body部分時,從緩存的MsgpackReq中獲取到Arg字段並返回
  4. 服務端在發送響應時,將數據包裝成一個MsgpackResp,而後用messagepack序列化併發送出去
  5. 客戶端在讀取響應head部分時,將收到的數據用messagepack反序列化成一個MsgpackResp,並將獲得的結果緩存起來
  6. 客戶端在讀取響應body部分時,從緩存的MsgpackResp中獲取到Reply或者Error字段並返回

Client實現

這裏直接上代碼:

func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error {
	//先判斷codec是否已經關閉,若是是則直接返回
	if c.closed {
		return nil
	}
	//將r和arg組裝成一個MsgpackReq並序列化
	request := &MsgpackReq{*r, arg}
	reqData, err := msgpack.Marshal(request)
	if err != nil {
		panic(err)
		return err
	}
	//先發送數據長度
	head := make([]byte, 4)
	binary.BigEndian.PutUint32(head, uint32(len(reqData)))
	_, err = c.rwc.Write(head)
	//再將序列化產生的數據發送出去
	_, err = c.rwc.Write(reqData)
	return err
}

func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error {
	//先判斷codec是否已經關閉,若是是則直接返回
	if c.closed {
		return nil
	}
	//讀取數據
	data, err := readData(c.rwc)
	if err != nil {
		//client一旦初始化就會開始輪詢數據,因此要處理鏈接close的狀況
		if strings.Contains(err.Error(), "use of closed network connection") {
			return nil
		}
		panic(err) //簡單起見,出現異常直接panic
	}

	//將讀取到的數據反序列化成一個MsgpackResp
	var response MsgpackResp
	err = msgpack.Unmarshal(data, &response)

	if err != nil {
		panic(err) //簡單起見,出現異常直接panic
	}

	//根據讀取到的數據設置request的各個屬性
	r.ServiceMethod = response.ServiceMethod
	r.Seq = response.Seq
	//同時將讀取到的數據緩存起來
	c.resp = response

	return nil
}

func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error {
	//這裏直接用緩存的數據返回便可

	if "" != c.resp.Error {//若是返回的是異常
		return errors.New(c.resp.Error)
	}
	if reply != nil {
		//正常返回,經過反射將結果設置到reply變量,由於reply必定是指針類型,因此沒必要檢查CanSet
		reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply))
	}
	return nil
}


func (c *MessagePackClientCodec) Close() error {
	c.closed = true //關閉時將closed設置爲true
	if c.rwc != nil {
		return c.rwc.Close()
	}
	return nil
}
複製代碼

以上就是client部分的實現,值得注意的有幾點:

  1. 讀寫數據前,須要檢查codec是否已經關閉了
  2. 讀寫數據時須要處理拆包粘包(經過readData函數處理)

Server實現

一樣直接上代碼:

func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error {
	//先判斷codec是否已經關閉,若是是則直接返回
	if c.closed {
		return nil
	}
	//將r和reply組裝成一個MsgpackResp並序列化
	response := &MsgpackResp{*r, reply}

	respData, err := msgpack.Marshal(response)
	if err != nil {
		panic(err)
		return err
	}
	head := make([]byte, 4)
	binary.BigEndian.PutUint32(head, uint32(len(respData)))
	_, err = c.rwc.Write(head)
	//將序列化產生的數據發送出去
	_, err = c.rwc.Write(respData)
	return err
}

func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error {
	//先判斷codec是否已經關閉,若是是則直接返回
	if c.closed {
		return nil
	}
	//讀取數據
	data, err := readData(c.rwc)
	if err != nil {
		//這裏不能直接panic,須要處理EOF和reset的狀況
		if err == io.EOF {
			return err
		}
		if strings.Contains(err.Error(), "connection reset by peer") {
			return err
		}
		panic(err) //其餘異常直接panic
	}
	//將讀取到的數據反序列化成一個MsgpackReq
	var request MsgpackReq
	err = msgpack.Unmarshal(data, &request)

	if err != nil {
		panic(err) //簡單起見,出現異常直接panic
	}

	//根據讀取到的數據設置request的各個屬性
	r.ServiceMethod = request.ServiceMethod
	r.Seq = request.Seq
	//同時將解析到的數據緩存起來
	c.req = request

	return nil
}

func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error {
	if arg != nil {
		//參數不爲nil,經過反射將結果設置到arg變量
		reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg))
	}
	return nil
}

func (c *MessagePackServerCodec) Close() error {
	c.closed = true
	if c.rwc != nil {
		return c.rwc.Close()
	}
	return nil
}
複製代碼

實際上server端的實現幾乎和client端的邏輯同樣,只是request和response的角色不一樣而已。其中有幾點須要注意:

  1. server端讀取數據時須要處理EOF和鏈接reset的狀況
  2. server在返回數據時沒有顯式處理接口產生的error,只是將reply傳遞了回去,這是由於error在rpc.Request裏存着,不用codec處理

處理拆包粘包

具體思路參考go語言處理TCP拆包/粘包 ,這裏附上readData的實現:

func readData(conn io.ReadWriteCloser) (data []byte, returnError error) {
	const HeadSize = 4 //設定長度部分佔4個字節
	headBuf := bytes.NewBuffer(make([]byte, 0, HeadSize))
	headData := make([]byte, HeadSize)
	for {
		readSize, err := conn.Read(headData)
		if err != nil {
			returnError = err
			return
		}
		headBuf.Write(headData[0:readSize])
		if headBuf.Len() == HeadSize {
			break
		} else {
			headData = make([]byte, HeadSize-readSize)
		}
	}
	bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes()))
	bodyBuf := bytes.NewBuffer(make([]byte, 0, bodyLen))
	bodyData := make([]byte, bodyLen)
	for {
		readSize, err := conn.Read(bodyData)
		if err != nil {
			returnError = err
			return
		}
		bodyBuf.Write(bodyData[0:readSize])
		if bodyBuf.Len() == bodyLen {
			break
		} else {
			bodyData = make([]byte, bodyLen-readSize)
		}
	}
	data = bodyBuf.Bytes()
	returnError = nil
	return
}
複製代碼

測試代碼

接下來咱們經過簡單的Echo調用測試一下咱們的codec:

//聲明接口類
type EchoService struct {}
//定義方法Echo
func (service *EchoService) Echo(arg string, result *string) error {
	*result = arg
	return nil
}
//服務端啓動邏輯
func RegisterAndServeOnTcp() {
	err := rpc.Register(&EchoService{})//註冊並非註冊方法,而是註冊EchoService的一個實例
	if err != nil {
		log.Fatal("error registering", err)
		return
	}
	tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
	if err != nil {
		log.Fatal("error resolving tcp", err)
	}
	listener, err := net.ListenTCP("tcp", tcpAddr)

	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal("error accepting", err)
		} else {
			//這裏先經過NewServerCodec得到一個實例,而後調用rpc.ServeCodec來啓動服務
			rpc.ServeCodec(msgpk.NewServerCodec(conn))
		}
	}
}
//客戶端調用邏輯
func Echo(arg string) (result string, err error) {
	var client *rpc.Client
	conn, err := net.Dial("tcp", ":1234")
	client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn))

	defer client.Close()

	if err != nil {
		return "", err
	}
	err = client.Call("EchoService.Echo", arg, &result) //經過類型加方法名指定要調用的方法
	if err != nil {
		return "", err
	}
	return result, err
}
//main函數
func main() {
	go server.RegisterAndServeOnTcp() //先啓動服務端
	time.Sleep(1e9)
	wg := new(sync.WaitGroup) //waitGroup用於阻塞主線程防止提早退出
	callTimes := 10
	wg.Add(callTimes)
	for i := 0; i < callTimes; i++ {
		go func() {
		        //使用hello world加一個隨機數做爲參數
			argString := "hello world "+strconv.Itoa(rand.Int())
			resultString, err := client.Echo(argString)
			if err != nil {
				log.Fatal("error calling:", err)
			}
			if resultString != argString {
				fmt.Println("error")
			} else {
				fmt.Printf("echo:%s\n", resultString)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}
複製代碼

上面的例子裏首先經過go server.RegisterAndServeOnTcp()啓動了服務端,而後同時啓動了10個go routine來發起請求,客戶端在收到響應以後會打印對應的結果。最後執行main函數,控制檯會輸出結果(後面的隨機數可能會不一樣):

echo:hello world 8674665223082153551
echo:hello world 6129484611666145821
echo:hello world 5577006791947779410
echo:hello world 605394647632969758
echo:hello world 4037200794235010051
echo:hello world 3916589616287113937
echo:hello world 894385949183117216
echo:hello world 1443635317331776148
echo:hello world 2775422040480279449
echo:hello world 6334824724549167320
複製代碼

結語

到這裏,一個簡單的自定義的go語言rpc就已經完成了,雖然自定義部分只有序列化協議部分而已,好比線程模型還是go rpc自帶的邏輯,除此以外也沒有前言裏提到的各類高級功能。後續再考慮嘗試用go語言從零開始實現一個RPC吧。

其餘

併發場景

有細心的同窗可能已經發現了,這裏實現的邏輯當中徹底沒有考慮併發的問題,緩存數據也是直接放到codec對象。而這樣簡單的實現也不會致使併發調用失敗,其中具體的緣由就是go rpc在處理每一個codec對象時,讀取請求都是順序的,而後再併發的處理請求並返回結果。

相關文章
相關標籤/搜索