在上一篇文章中咱們先列舉了大體的需求,定義了消息協議。此次咱們着手搭建基本的RPC框架,首先實現基礎的方法調用功能。java
RPC調用的第一步,就是在服務端定義要對外暴露的方法,在grpc或者是thrift中,這一步咱們須要編寫語言無關的idl文件,而後經過idl文件生成對應語言的代碼。而在咱們的框架裏,出於簡單起見,咱們不採用idl的方式,直接在代碼裏定義接口和方法。這裏先規定對外的方法必須遵照如下幾個條件:git
爲何要有這幾個規定呢,具體的緣由是這樣的:由於java中的RPC框架場用到的動態代理在go語言中並不支持,因此咱們須要顯式地定義方法的統一格式,這樣在RPC框架中才能統一地處理不一樣的方法。因此咱們規定了方法的格式:github
這裏咱們須要注意的是,服務提供者在對外暴露時並不須要以接口的形式暴露,只要服務提供者有符合規則的方法便可;而客戶端在調用方法時指定的是服務提供者的具體類型,不能指定接口的名稱,即便服務提供者實現了這個接口。golang
contet.Context緩存
context是go語言提供的關於請求上下文的抽象,它攜帶了請求deadline、cancel信號的信息,還能夠傳遞一些上下文信息,很是適合做爲RPC請求的上下文,咱們能夠在context中設置超時時間,還能夠將一些參數無關的元數據經過context傳遞到服務端。網絡
實際上,方法的固定格式以及用Call和Go來表示同步和異步調用都是go自帶的rpc裏的規則,只是在參數裏增長了context.Context。不得不說go自帶的rpc設計確實十分優秀,值得好好學習理解。框架
首先是面向使用者的RPC框架中的客戶端和服務端接口:異步
type RPCServer interface {
//註冊服務實例,rcvr是receiver的意思,它是咱們對外暴露的方法的實現者,metaData是註冊服務時攜帶的額外的元數據,它描述了rcvr的其餘信息
Register(rcvr interface{}, metaData map[string]string) error
//開始對外提供服務
Serve(network string, addr string) error
}
type RPCClient interface {
//Go表示異步調用
Go(ctx context.Context, serviceMethod string, arg interface{}, reply interface{}, done chan *Call) *Call
//Call表示異步調用
Call(ctx context.Context, serviceMethod string, arg interface{}, reply interface{}) error
Close() error
}
type Call struct {
ServiceMethod string // 服務名.方法名
Args interface{} // 參數
Reply interface{} // 返回值(指針類型)
Error error // 錯誤信息
Done chan *Call // 在調用結束時激活
}
複製代碼
此次先實現RPC調用部分,這兩層暫時忽略,後續再實現。post
接下來咱們須要選擇一個序列化協議,這裏就選以前使用過的messagepack。以前設計的通訊協議分爲兩個部分:head和body,這兩個部分都須要進行序列化和反序列化。head部分是元數據,能夠直接採用messagepack序列化,而body部分是方法的參數或者響應,其序列化由head中的SerializeType決定,這樣的好處就是爲了後續擴展方便,目前也使用messagepack序列化,後續也能夠採用其餘的方式序列化。學習
序列化的邏輯也定義爲接口:
type Codec interface {
Encode(value interface{}) ([]byte, error)
Decode(data []byte, value interface{}) error
}
複製代碼
肯定好了序列化協議以後,咱們就能夠定義消息協議相關的接口了。協議的設計參考上一篇文章:從零開始實現一個RPC框架(零)
接下來就是協議的接口定義:
//Messagge表示一個消息體
type Message struct {
*Header //head部分, Header的定義參考上一篇文章
Data []byte //body部分
}
//Protocol定義瞭如何構造和序列化一個完整的消息體
type Protocol interface {
NewMessage() *Message
DecodeMessage(r io.Reader) (*Message, error)
EncodeMessage(message *Message) []byte
}
複製代碼
根據以前的設計,因此交互都經過接口進行,這樣方便擴展和替換。
協議的接口定義好了以後,接下來就是網絡傳輸層的定義:
//傳輸層的定義,用於讀取數據
type Transport interface {
Dial(network, addr string) error
//這裏直接內嵌了ReadWriteCloser接口,包含Read、Write和Close方法
io.ReadWriteCloser
RemoteAddr() net.Addr
LocalAddr() net.Addr
}
//服務端監聽器定義,用於監聽端口和創建鏈接
type Listener interface {
Listen(network, addr string) error
Accept() (Transport, error)
//這裏直接內嵌了Closer接口,包含Close方法
io.Closer
}
複製代碼
各個層次的接口定義好了以後,就能夠開始搭建基礎的框架了,這裏不附上具體的代碼了,具體代碼能夠參考github連接 ,這裏大體描述一下各個部分的實現思路。
客戶端的功能比較簡單,就是將參數序列化以後,組裝成一個完整的消息體發送出去。請求發送出去的同時,將未完成的請求都緩存起來,每收到一個響應就和未完成的請求進行匹配。
發送請求的核心在Go
和send
方法,Go
的功能是組裝參數,send
方法是將參數序列化並經過傳輸層的接口發送出去,同時將請求緩存到pendingCalls
中。而Call
方法則是直接調用Go
方法並阻塞等待知道返回或者超時。 接收響應的核心在input
方法,input
方法在client初始化完成時經過go input()
執行。input
方法包含一個無限循環,在無限循環中讀取傳輸層的數據並將其反序列化,並將反序列化獲得的響應與緩存的請求進行匹配。
注:send
和input
方法的命名也是從go自帶的rpc裏學來的。
服務端在接受註冊時,會過濾服務提供者的各個方法,將合法的方法緩存起來。
服務端的核心邏輯是serveTransport
方法,它接收一個Transport
對象,而後在一個無限循環中從Transport
讀取數據並反序列化成請求,根據請求指定的方法查找自身緩存的方法,找到對應 的方法後經過反射執行對應的實現並返。執行完成後再根據返回結果或者是執行發生的異常組裝成一個完整的消息,經過Transport
發送出去。
服務端在反射執行方法時,須要將實現者做爲執行的第一個參數,因此參數比方法定義中的參數多一個。
這兩個部分就比較簡單了,codec基本上就是使用messagepack實現了對應的接口;protocol的實現就是根據咱們定義的協議進行解析。
在執行過程當中,除了客戶端的用戶線程和服務端用來執行方法的服務線程,還分別增長了客戶端輪詢線程和服務端監聽線程,大體的示意圖以下:
這裏的線程模型比較簡單,服務端針對每一個創建的鏈接都會建立一個線程(goroutine),雖然說goroutine很輕量,可是也不是徹底沒有消耗的,後續能夠再進一步進行優化,好比把讀取數據反序列化和執行方法拆分到不一樣的線程執行,或者把goroutine池化等等。到此咱們的RPC框架已經具有了雛形,可以支持基礎的RPC調用了。實際上整個框架就是參考go自帶的rpc的結構,客戶端和服務端的線程模型和go自帶的rpc同樣,只是本身定義了序列化和消息協議,並且實現的過程當中保留了擴展的接口,方便後續進行完善和擴展。下一步的規劃是實現過濾器鏈,以便後續實現服務治理相關的功能。