RPC 框架在微服務中重要的一部分,熟悉和了解其原理是很是有必要的。Go 語言中源碼自帶實現了 RPC 功能,雖然官方已經宣佈再也不更新,可是因它實現簡單,代碼量不大,不少地方值得學習和借鑑,是閱讀 RPC 源碼的一個很是好的開始。git
源碼地址: github.com/golang/go/t…github
先來看看調用的官方例子:golang
// content of server.go
package main
import(
"net"
"net/rpc"
"net/http"
"errors"
"log"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func listenTCP(addr string) (net.Listener, string) {
l, e := net.Listen("tcp", addr)
if e != nil {
log.Fatalf("net.Listen tcp :0: %v", e)
}
return l, l.Addr().String()
}
func main() {
rpc.Register(new(Arith)) //註冊服務
var l net.Listener
tcpAddr := "127.0.0.1:8080"
l, serverAddr := listenTCP(tcpAddr) //監聽TCP鏈接
log.Println("RPC server listening on", serverAddr)
go rpc.Accept(l)
rpc.HandleHTTP() //監聽HTTP鏈接
httpAddr := "127.0.0.1:8081"
l, serverAddr = listenTCP(httpAddr)
log.Println("RPC server listening on", serverAddr)
go http.Serve(l, nil)
select{}
}
複製代碼
rpc調用的功能就是Arith實現了一個Multiply和Divide方法。 看main函數,rpc實現了一個註冊rpc.Register(new(Arith))
方法,而後啓動監聽listenTCP(tcpAddr)
,這個是經過net包中的Listen方法,監聽的對象能夠是TCP鏈接rpc.Accept(l)
,也能夠試HTTP鏈接http.Serve(l, nil)
,這個是藉助net/http包啓動HTTPServer.json
// content of client.go
package main
import(
"net/rpc"
"log"
"fmt"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func main() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:8081")
if err != nil {
log.Fatal("dialing:", err)
}
// Synchronous call
args := &Args{7,8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)
// Asynchronous call
clientTCP, err := rpc.Dial("tcp", "127.0.0.1:8080")
if err != nil {
log.Fatal("dialing:", err)
}
quotient := new(Quotient)
divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
if replyCall.Error != nil {
fmt.Println(replyCall.Error)
} else {
fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem)
}
複製代碼
客戶端代碼rpc 提供了兩個方法 rpc.DialHTTP
和 rpc.Dial
分別提供監聽 HTTP 和 Tcp 鏈接。而後經過 Call
或者 Go
來調用服務器的方法,兩者的區別是一個是同步調用,Go
是異步調用。緩存
運行結果:服務器
// server.go
➜ server ./serve
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8080
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8081
複製代碼
// client.go
➜ client ./client
Arith: 7*8=56
Arith: 7/8=0...7
複製代碼
先來看看客戶端的源碼,先上一張圖瞭解一下客戶端代碼的主要邏輯:網絡
Dial
and DialHTTP
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
複製代碼
Dial
創建在 net.Dial 上,返回一個client對象,DialHTTP
跟Dial
相似,只不過多了一些HTTP的處理,最終都是返回 NewClient(conn)。框架
NewClient
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
複製代碼
NewClient
裏作了2件事,第一件事是生成client結構體對象,包括序列化方式,初始化其中對象等等,Go Rpc默認採用的是gob序列化,但也能夠用json或者protobuf。第二件事是啓動一個goroutine協程,調用了 input
方法,這個client的核心部分,下面再講。異步
Call
and Go
上面例子中,生成client對象後,會顯式的調用Call
或 Go
,表示同步調用和異步調用。下面來看看源碼:func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
複製代碼
能夠看到,client.Call
方法其實也是調用client.Go
,只不過經過chan
進行阻塞。socket
生成一個Call的結構體,將服務器的調用方法、參數、返回參數,調用結束標記進行組裝,而後調用client.send
的方法,將call結構體發給服務器。服務器拿到這些參數後,會經過反射出具體的方法,而後執行對應的函數。 下面是Call
結構體的定義:
// Call
type Call struct {
ServiceMethod string // The name of the service and method to call. 服務方法名
Args interface{} // The argument to the function (*struct). 請求參數
Reply interface{} // The reply from the function (*struct). 返回參數
Error error // After completion, the error status. 錯誤狀態
Done chan *Call // Strobes when call is complete.
}
複製代碼
client.send
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
複製代碼
send 方法是將剛纔的call
結構體中的信息發給服務器,首先數據不是直接發給服務器的,而是將請求參數和服務器的方法先賦值給client結構體中的Request結構體,同時在賦值的過程須要加鎖。而後再調用Gob的WriteRequest方法,將數據刷到緩存區。
client.input
client.send
方法是將數據發給Server,而input則相反,獲取Server的返回結果Response給客戶端。func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
err = client.codec.ReadResponseBody(nil)
....
}
call.done()
}
}
...
}
}
複製代碼
主要邏輯是不斷循環讀取TCP上的流,把Header解析成Response對象,以及將Body解析到call.Reply對象,解析完後觸發call中的done函數。這樣客戶端就能夠拿到Reply對象就是服務器返回的結果,能夠打印獲取其中的值。
總結:
描述完這幾個方法,在回頭看開始的client.go
的流程圖就清晰了,能夠說是分兩條線,一條線顯示的調用發送請求數據,另一條線則起協程獲取服務器的返回數據。
話很少說,先來一張圖瞭解一下大概:
總體分三部分,第一部分註冊服務器定義的方法,第二部分監聽客戶端的請求,解析獲取到客戶端的請求參數。第三部分拿到請求參數執行服務器的調用函數,將返回結果返回給客戶端。
整個過程其實能夠對比是一次socket的調用過程。
register
首先來看一下server的結構體:type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
複製代碼
看英語註釋就比起清楚具體是作什麼的,Server存儲服務器的service以及其請求的Request和Response,這二個就是跟客戶端約定的協議,以下:
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
}
複製代碼
service 存儲服務器須要註冊的方法,methodType就是具體方法的屬性。
因此要想客戶端進行遠程調用服務器的方法,前提是在調用以前,服務器的方法均已加載在Server結構體中,因此須要服務器顯示的調用register方法,下面看一下里面核心的代碼:
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
...
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true)
...
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
...
}
...
}
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
argType := mtype.In(1)
...
replyType := mtype.In(2)
...
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
複製代碼
這段代碼就是經過反射把結構體實現的方法的一些屬性獲取到,包括自己可執行的方法對象、名稱、請求參數、返回參數。
最終存儲到server的serviceMap中。 客戶端調用服務器的方法的結構爲 struct.method,這樣只須要按 . 進行分割,拿到struct名稱和method名稱則能夠經過再serviceMap獲取到方法,執行得到結果。
註冊完方法後,接下來就是監聽客戶端的請求了。
Accept
先來看看 Accept
的代碼:
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
複製代碼
經過 net 包中的監聽tcp端口,而後起了一個協程,來看看這個協程裏作了什麼?
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
...
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
...
}
複製代碼
這段也好理解,ServeConn 將gob序列化的方法和conn保存到gobServerCodec結構體,而後調用了server.ServeCodec方法,這個方式作的事情就是將客戶端傳過來的包解析序列化解析,將請求參數,待返回的變量,以及是調服務器哪一個方法,這些均在上面的 server.readRequest方法處理。
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
...
}
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
...
dot := strings.LastIndex(req.ServiceMethod, ".")
...
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]
// Look up the request.
svci, ok := server.serviceMap.Load(serviceName)
...
svc = svci.(*service)
mtype = svc.method[methodName]
...
}
return
}
複製代碼
核心的功能再 readRequestHeader 中,作的一件事就是將客戶端傳過來的 struct.method,按 . 進行分割,而後拿到serviceName和methodName,而後再去server.serviceMap中拿到具體的服務和方法執行對象。
拿到以後,會起一個協程,調service.call方法,這裏面作的事情就是執行服務器服務的方法,拿到返回結果,再調用WriteReponse,將數據寫回去。而後客戶端的 input 方法循環獲取結果。這樣造成閉環。
下面看看service.call方法:
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
...
function := mtype.method.Func
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
...
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
...
}
複製代碼
實現的功能跟上面分析的同樣,經過mtype拿到函數對象,而後調用反射的Call方法執行獲得結果,最後調用server.sendResponse發送發回結果。
看到這裏再回過來看上面畫的Server代碼流程圖,就很是清晰了。
Go Rpc源碼解讀就到這裏。
Go RPC源碼目前官方已經沒有維護,官方推薦使用grpc,下一篇計劃分析grpc的源碼。
下面總結一下優缺點:
目前開源的RPC框架已經不是像這種簡單的網絡調用了,還會包括不少服務治理的功能,好比服務註冊與發現、限流熔斷、監控等等。這個之後接觸新的rpc再分享,最終達到能夠本身完整寫一個rpc框架的目的。
更多Rpc相關文章和討論,請關注公衆號:『 天澄技術雜談 』