RPC(Remote Procedure Call)是遠程方法調用的縮寫,它能夠經過網絡調用遠程對象的方法。Go 標準庫net/rpc
提供了一個簡單、強大且高性能的 RPC 實現。僅需編寫不多的代碼就能實現 RPC 服務。本文就來介紹一下這個庫。git
標準庫無需安裝。github
因爲是網絡程序,咱們須要編寫服務端和客戶端兩個程序。首先是服務端程序:golang
package main import ( "errors" "log" "net" "net/http" "net/rpc" ) type Args struct { A, B int } type Quotient struct { Quo, Rem int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error { *reply = args.A * args.B return nil } func (t *Arith) Divide(args *Args, quo *Quotient) error { if args.B == 0 { return errors.New("divide by 0") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func main() { arith := new(Arith) rpc.Register(arith) rpc.HandleHTTP() if err := http.ListenAndServe(":1234", nil); err != nil { log.Fatal("serve error:", err) } }
咱們定義了一個Arith
類型,爲它編寫了兩個方法Multiply
和Divide
。建立Arith
類型的對象arith
,調用rpc.Register(arith)
會註冊這兩個方法。rpc
庫對註冊的方法有必定的限制,方法必須知足簽名func (t *T) MethodName(argType T1, replyType *T2) error
:web
error
類型的值。返回非nil
的值,表示調用出錯。rpc.HandleHTTP()
註冊 HTTP 路由。http.ListenAndServe(":1234", nil)
在端口1234
上啓動一個 HTTP 服務,請求 rpc 方法會交給rpc
內部路由處理。這樣咱們就能夠經過客戶端調用這兩個方法了:編程
package main import ( "fmt" "log" "net/rpc" ) type Args struct { A, B int } type Quotient struct { Quo, Rem int } func main() { client, err := rpc.DialHTTP("tcp", ":1234") if err != nil { log.Fatal("dialing:", err) } args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) args = &Args{15, 6} var quo Quotient err = client.Call("Arith.Divide", args, &quo) if err != nil { log.Fatal("Divide error:", err) } fmt.Printf("Divide: %d/%d=%d...%d\n", args.A, args.B, quo.Quo, quo.Rem) }
客戶端比服務端稍微簡單一點,咱們使用rpc.DialHTTP("tcp", ":1234")
鏈接到服務端的監聽地址,返回一個 rpc 的客戶端對象。後續就能夠調用該對象的Call()
方法調用服務端對象的對應方法,依次傳入方法名(須要加上類型限定)、參數、一個指針(用於接收返回值)。首先運行服務端程序:json
$ go run main.go
而後在一個新的控制檯中運行客戶端程序,輸出:瀏覽器
$ go run client.go Multiply: 7*8=56 Divide: 15/6=2...3
對net/http
包不熟悉的童鞋可能會以爲奇怪,rpc.HandleHTTP()
與http.ListenAndServer(":1234", nil)
是怎麼聯繫起來的?咱們簡單看一下源碼:安全
// src/net/rpc/server.go const ( // Defaults used by HandleHTTP DefaultRPCPath = "/_goRPC_" DefaultDebugPath = "/debug/rpc" ) func (server *Server) HandleHTTP(rpcPath, debugPath string) { http.Handle(rpcPath, server) http.Handle(debugPath, debugHTTP{server}) } func HandleHTTP() { DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath) }
實際上,rpc.HandleHTTP()
會調用http.Handle()
在預約義的路徑上(/_goRPC_
)註冊處理器。這個處理器最終被添加到net/http
包中的默認多路複用器上:服務器
// src/net/http/server.go func Handle(pattern string, handler Handler) { DefaultServeMux.Handle(pattern, handler) }
而http.ListenAndServer()
第二個參數傳入nil
時也是使用默認的多路複用器。具體能夠看看我以前的文章Go Web 編程之 程序結構。微信
細心的朋友可能發現了,除了默認的路徑/_goRPC_
用來處理 RPC 請求,rpc.HandleHTTP()
方法還註冊了一個調試路徑/debug/rpc
。咱們能夠直接在瀏覽器中訪問這個網址(須要服務端程序開啓。若是服務端在遠程,須要相應地修改地址)localhost:1234,直觀的查看各個方法的調用狀況:
上面的例子中,咱們在客戶端使用了同步的調用方式,即一直等待服務端的響應或出錯。在等待的過程當中,客戶端就不能處理其它的任務了。固然,咱們也能夠採用異步的調用方式:
func main() { client, err := rpc.DialHTTP("tcp", ":1234") if err != nil { log.Fatal("dialing:", err) } args1 := &Args{7, 8} var reply int multiplyReply := client.Go("Arith.Multiply", args1, &reply, nil) args2 := &Args{15, 6} var quo Quotient divideReply := client.Go("Arith.Divide", args2, &quo, nil) ticker := time.NewTicker(time.Millisecond) defer ticker.Stop() var multiplyReplied, divideReplied bool for !multiplyReplied || !divideReplied { select { case replyCall := <-multiplyReply.Done: if err := replyCall.Error; err != nil { fmt.Println("Multiply error:", err) } else { fmt.Printf("Multiply: %d*%d=%d\n", args1.A, args1.B, reply) } multiplyReplied = true case replyCall := <-divideReply.Done: if err := replyCall.Error; err != nil { fmt.Println("Divide error:", err) } else { fmt.Printf("Divide: %d/%d=%d...%d\n", args2.A, args2.B, quo.Quo, quo.Rem) } divideReplied = true case <-ticker.C: fmt.Println("tick") } } }
異步調用使用client.Go()
方法,參數與同步調用基本同樣。它返回一個rpc.Call
對象:
// src/net/rpc/client.go type Call struct { ServiceMethod string Args interface{} Reply interface{} Error error Done chan *Call }
咱們能夠經過該對象獲取這次調用的信息,如方法名、參數、返回值和錯誤。咱們經過監聽通道Done
是否有值判斷調用是否完成。上面代碼中使用一個select
語句輪詢兩次調用的狀態。注意一點,若是多個通道都有值,select
執行哪一個case
是隨機的。因此可能先輸出divide
的信息:
$ go run client.go Divide: 15/6=2...3 Multiply: 7*8=56
服務端能夠繼續使用一開始的。
默認狀況下,rpc.Register()
將方法接收者(receiver
)的類型名做爲方法名前綴。咱們也能夠本身設置。這時須要調用RegisterName(name string, rcvr interface{}) error
方法:
func main() { arith := new(Arith) rpc.RegisterName("math", arith) rpc.HandleHTTP() if err := http.ListenAndServe(":1234", nil); err != nil { log.Fatal("serve error:", err) } }
上面咱們將註冊的方法名前綴改成math
了,客戶端調用時傳入的方法名也須要相應的修改:
func main() { client, err := rpc.DialHTTP("tcp", ":1234") if err != nil { log.Fatal("dialing:", err) } args := &Args{7, 8} var reply int err = client.Call("math.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
上面咱們都是使用 HTTP 協議來實現 rpc 服務的,rpc
庫也支持直接使用 TCP 協議。首先,服務端先調用net.Listen("tcp", ":1234")
建立一個監聽某個 TCP 端口的監聽器(Accepter),而後使用rpc.Accept(l)
在此監聽器上接受鏈接並處理:
func main() { l, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) rpc.Accept(l) }
而後,客戶端調用rpc.Dial()
以 TCP 協議鏈接到服務端:
func main() { client, err := rpc.Dial("tcp", ":1234") if err != nil { log.Fatal("dialing:", err) } args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
咱們能夠本身接受鏈接,而後在此鏈接上應用 rpc 協議:
func main() { l, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) for { conn, err := l.Accept() if err != nil { log.Fatal("accept error:", err) } go rpc.ServeConn(conn) } }
這個客戶端與上面 TCP 的客戶端同樣,不用修改。
默認客戶端與服務端之間的數據使用gob
編碼,咱們可使用其它的格式來編碼。在服務端,咱們要實現rpc.ServerCodec
接口:
// src/net/rpc/server.go type ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(interface{}) error WriteResponse(*Response, interface{}) error Close() error }
實際上不用這麼麻煩,咱們查看源碼看看gobServerCodec
是怎麼實現的,而後仿造實現一個就好了。下面我實現了一個 JSON 格式的編解碼器:
type JsonServerCodec struct { rwc io.ReadWriteCloser dec *json.Decoder enc *json.Encoder encBuf *bufio.Writer closed bool } func NewJsonServerCodec(conn io.ReadWriteCloser) *JsonServerCodec { buf := bufio.NewWriter(conn) return &JsonServerCodec{conn, json.NewDecoder(conn), json.NewEncoder(buf), buf, false} } func (c *JsonServerCodec) ReadRequestHeader(r *rpc.Request) error { return c.dec.Decode(r) } func (c *JsonServerCodec) ReadRequestBody(body interface{}) error { return c.dec.Decode(body) } func (c *JsonServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { if c.encBuf.Flush() == nil { log.Println("rpc: json error encoding response:", err) c.Close() } return } if err = c.enc.Encode(body); err != nil { if c.encBuf.Flush() == nil { log.Println("rpc: json error encoding body:", err) c.Close() } return } return c.encBuf.Flush() } func (c *JsonServerCodec) Close() error { if c.closed { return nil } c.closed = true return c.rwc.Close() } func main() { l, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) for { conn, err := l.Accept() if err != nil { log.Fatal("accept error:", err) } go rpc.ServeCodec(NewJsonServerCodec(conn)) } }
在for
循環中須要建立編解碼器JsonServerCodec
傳給ServeCodec
方法。一樣的,客戶端要實現rpc.ClientCodec
接口,也是仿造gobClientCodec
的實現:
type JsonClientCodec struct { rwc io.ReadWriteCloser dec *json.Decoder enc *json.Encoder encBuf *bufio.Writer } func NewJsonClientCodec(conn io.ReadWriteCloser) *JsonClientCodec { encBuf := bufio.NewWriter(conn) return &JsonClientCodec{conn, json.NewDecoder(conn), json.NewEncoder(encBuf), encBuf} } func (c *JsonClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { return } if err = c.enc.Encode(body); err != nil { return } return c.encBuf.Flush() } func (c *JsonClientCodec) ReadResponseHeader(r *rpc.Response) error { return c.dec.Decode(r) } func (c *JsonClientCodec) ReadResponseBody(body interface{}) error { return c.dec.Decode(body) } func (c *JsonClientCodec) Close() error { return c.rwc.Close() } func main() { conn, err := net.Dial("tcp", ":1234") if err != nil { log.Fatal("dial error:", err) } client := rpc.NewClientWithCodec(NewJsonClientCodec(conn)) args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
要使用NewClientWithCodec
以指定的編解碼器建立客戶端。
實際上,上面咱們調用的方法rpc.Register
,rpc.RegisterName
,rpc.ServeConn
,rpc.ServeCodec
都是轉而去調用默認DefaultServer
的相關方法:
// src/net/rpc/server.go var DefaultServer = NewServer() func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) } func RegisterName(name string, rcvr interface{}) error { return DefaultServer.RegisterName(name, rcvr) } func ServeConn(conn io.ReadWriteCloser) { DefaultServer.ServeConn(conn) } func ServeCodec(codec ServerCodec) { DefaultServer.ServeCodec(codec) }
可是由於DefaultServer
是全局共享的,若是有第三方庫使用了相關方法,而且註冊了一些對象的方法,咱們引用這個第三方庫以後,就出現兩個問題。第一,可能與咱們註冊的方法衝突;第二,帶來額外的安全隱患(庫中方法直接panic
?)。故而推薦作法是本身NewServer
:
func main() { arith := new(Arith) server := rpc.NewServer() server.RegisterName("math", arith) server.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath) if err := http.ListenAndServe(":1234", nil); err != nil { log.Fatal("serve error:", err) } }
這實際上是一個套路,不少庫會提供一個默認的實現直接使用,如log
、net/http
這些庫。可是也提供了建立和自定義的方法。通常測試時爲了方即可以使用默認實現,實踐中最好本身建立相應的對象,避免干擾和安全問題。
本文介紹了 Go 標準庫中的rpc
,它使用很是簡單,性能異常強大。不少rpc
的第三方庫都是對rpc
的封裝,早期版本的rpcx
就是基於rpc
作的封裝。
你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~