Go 每日一庫之 rpc

簡介

RPC(Remote Procedure Call)是遠程方法調用的縮寫,它能夠經過網絡調用遠程對象的方法。Go 標準庫net/rpc提供了一個簡單、強大且高性能的 RPC 實現。僅需編寫不多的代碼就能實現 RPC 服務。本文就來介紹一下這個庫。git

快速使用

標準庫無需安裝。github

因爲是網絡程序,咱們須要編寫服務端和客戶端兩個程序。首先是服務端程序:golang

package main

import (
  "errors"
  "log"
  "net"
  "net/http"
  "net/rpc"
)

type Args struct {
  A, B int
}

type Quotient struct {
  Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
  *reply = args.A * args.B
  return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
  if args.B == 0 {
    return errors.New("divide by 0")
  }

  quo.Quo = args.A / args.B
  quo.Rem = args.A % args.B
  return nil
}

func main() {
  arith := new(Arith)
  rpc.Register(arith)
  rpc.HandleHTTP()
  if err := http.ListenAndServe(":1234", nil); err != nil {
    log.Fatal("serve error:", err)
  }
}

咱們定義了一個Arith類型,爲它編寫了兩個方法MultiplyDivide。建立Arith類型的對象arith,調用rpc.Register(arith)會註冊這兩個方法。rpc庫對註冊的方法有必定的限制,方法必須知足簽名func (t *T) MethodName(argType T1, replyType *T2) errorweb

  • 首先,方法必須是導出的(名字首字母大寫);
  • 其次,方法接受兩個參數,必須是導出的或內置類型。第一個參數表示客戶端傳遞過來的請求參數,第二個是須要返回給客戶端的響應。第二個參數必須爲指針類型(須要修改);
  • 最後,方法必須返回一個error類型的值。返回非nil的值,表示調用出錯。

rpc.HandleHTTP()註冊 HTTP 路由。http.ListenAndServe(":1234", nil)在端口1234上啓動一個 HTTP 服務,請求 rpc 方法會交給rpc內部路由處理。這樣咱們就能夠經過客戶端調用這兩個方法了:編程

package main

import (
  "fmt"
  "log"
  "net/rpc"
)

type Args struct {
  A, B int
}

type Quotient struct {
  Quo, Rem int
}

func main() {
  client, err := rpc.DialHTTP("tcp", ":1234")
  if err != nil {
    log.Fatal("dialing:", err)
  }

  args := &Args{7, 8}
  var reply int
  err = client.Call("Arith.Multiply", args, &reply)
  if err != nil {
    log.Fatal("Multiply error:", err)
  }
  fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)

  args = &Args{15, 6}
  var quo Quotient
  err = client.Call("Arith.Divide", args, &quo)
  if err != nil {
    log.Fatal("Divide error:", err)
  }
  fmt.Printf("Divide: %d/%d=%d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}

客戶端比服務端稍微簡單一點,咱們使用rpc.DialHTTP("tcp", ":1234")鏈接到服務端的監聽地址,返回一個 rpc 的客戶端對象。後續就能夠調用該對象的Call()方法調用服務端對象的對應方法,依次傳入方法名(須要加上類型限定)、參數、一個指針(用於接收返回值)。首先運行服務端程序:json

$ go run main.go

而後在一個新的控制檯中運行客戶端程序,輸出:瀏覽器

$ go run client.go
Multiply: 7*8=56
Divide: 15/6=2...3

net/http包不熟悉的童鞋可能會以爲奇怪,rpc.HandleHTTP()http.ListenAndServer(":1234", nil)是怎麼聯繫起來的?咱們簡單看一下源碼:安全

// src/net/rpc/server.go
const (
  // Defaults used by HandleHTTP
  DefaultRPCPath   = "/_goRPC_"
  DefaultDebugPath = "/debug/rpc"
)

func (server *Server) HandleHTTP(rpcPath, debugPath string) {
  http.Handle(rpcPath, server)
  http.Handle(debugPath, debugHTTP{server})
}

func HandleHTTP() {
  DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}

實際上,rpc.HandleHTTP()會調用http.Handle()在預約義的路徑上(/_goRPC_)註冊處理器。這個處理器最終被添加到net/http包中的默認多路複用器上:服務器

// src/net/http/server.go
func Handle(pattern string, handler Handler) {
  DefaultServeMux.Handle(pattern, handler)
}

http.ListenAndServer()第二個參數傳入nil時也是使用默認的多路複用器。具體能夠看看我以前的文章Go Web 編程之 程序結構微信

細心的朋友可能發現了,除了默認的路徑/_goRPC_用來處理 RPC 請求,rpc.HandleHTTP()方法還註冊了一個調試路徑/debug/rpc。咱們能夠直接在瀏覽器中訪問這個網址(須要服務端程序開啓。若是服務端在遠程,須要相應地修改地址)localhost:1234,直觀的查看各個方法的調用狀況:

異步調用

上面的例子中,咱們在客戶端使用了同步的調用方式,即一直等待服務端的響應或出錯。在等待的過程當中,客戶端就不能處理其它的任務了。固然,咱們也能夠採用異步的調用方式:

func main() {
  client, err := rpc.DialHTTP("tcp", ":1234")
  if err != nil {
    log.Fatal("dialing:", err)
  }

  args1 := &Args{7, 8}
  var reply int
  multiplyReply := client.Go("Arith.Multiply", args1, &reply, nil)

  args2 := &Args{15, 6}
  var quo Quotient
  divideReply := client.Go("Arith.Divide", args2, &quo, nil)

  ticker := time.NewTicker(time.Millisecond)
  defer ticker.Stop()

  var multiplyReplied, divideReplied bool
  for !multiplyReplied || !divideReplied {
    select {
    case replyCall := <-multiplyReply.Done:
      if err := replyCall.Error; err != nil {
        fmt.Println("Multiply error:", err)
      } else {
        fmt.Printf("Multiply: %d*%d=%d\n", args1.A, args1.B, reply)
      }
      multiplyReplied = true
    case replyCall := <-divideReply.Done:
      if err := replyCall.Error; err != nil {
        fmt.Println("Divide error:", err)
      } else {
        fmt.Printf("Divide: %d/%d=%d...%d\n", args2.A, args2.B, quo.Quo, quo.Rem)
      }
      divideReplied = true
    case <-ticker.C:
      fmt.Println("tick")
    }
  }
}

異步調用使用client.Go()方法,參數與同步調用基本同樣。它返回一個rpc.Call對象:

// src/net/rpc/client.go
type Call struct {
  ServiceMethod string     
  Args          interface{}
  Reply         interface{}
  Error         error      
  Done          chan *Call 
}

咱們能夠經過該對象獲取這次調用的信息,如方法名、參數、返回值和錯誤。咱們經過監聽通道Done是否有值判斷調用是否完成。上面代碼中使用一個select語句輪詢兩次調用的狀態。注意一點,若是多個通道都有值,select執行哪一個case是隨機的。因此可能先輸出divide的信息:

$ go run client.go 
Divide: 15/6=2...3
Multiply: 7*8=56

服務端能夠繼續使用一開始的。

定製方法名

默認狀況下,rpc.Register()將方法接收者(receiver)的類型名做爲方法名前綴。咱們也能夠本身設置。這時須要調用RegisterName(name string, rcvr interface{}) error方法:

func main() {
  arith := new(Arith)
  rpc.RegisterName("math", arith)
  rpc.HandleHTTP()
  if err := http.ListenAndServe(":1234", nil); err != nil {
    log.Fatal("serve error:", err)
  }
}

上面咱們將註冊的方法名前綴改成math了,客戶端調用時傳入的方法名也須要相應的修改:

func main() {
  client, err := rpc.DialHTTP("tcp", ":1234")
  if err != nil {
    log.Fatal("dialing:", err)
  }

  args := &Args{7, 8}
  var reply int
  err = client.Call("math.Multiply", args, &reply)
  if err != nil {
    log.Fatal("Multiply error:", err)
  }
  fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

TCP

上面咱們都是使用 HTTP 協議來實現 rpc 服務的,rpc庫也支持直接使用 TCP 協議。首先,服務端先調用net.Listen("tcp", ":1234")建立一個監聽某個 TCP 端口的監聽器(Accepter),而後使用rpc.Accept(l)在此監聽器上接受鏈接並處理:

func main() {
  l, err := net.Listen("tcp", ":1234")
  if err != nil {
    log.Fatal("listen error:", err)
  }

  arith := new(Arith)
  rpc.Register(arith)
  rpc.Accept(l)
}

而後,客戶端調用rpc.Dial()以 TCP 協議鏈接到服務端:

func main() {
  client, err := rpc.Dial("tcp", ":1234")
  if err != nil {
    log.Fatal("dialing:", err)
  }

  args := &Args{7, 8}
  var reply int
  err = client.Call("Arith.Multiply", args, &reply)
  if err != nil {
    log.Fatal("Multiply error:", err)
  }
  fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

本身接收鏈接

咱們能夠本身接受鏈接,而後在此鏈接上應用 rpc 協議:

func main() {
  l, err := net.Listen("tcp", ":1234")
  if err != nil {
    log.Fatal("listen error:", err)
  }

  arith := new(Arith)
  rpc.Register(arith)

  for {
    conn, err := l.Accept()
    if err != nil {
      log.Fatal("accept error:", err)
    }

    go rpc.ServeConn(conn)
  }
}

這個客戶端與上面 TCP 的客戶端同樣,不用修改。

自定義編碼格式

默認客戶端與服務端之間的數據使用gob編碼,咱們可使用其它的格式來編碼。在服務端,咱們要實現rpc.ServerCodec接口:

// src/net/rpc/server.go
type ServerCodec interface {
  ReadRequestHeader(*Request) error
  ReadRequestBody(interface{}) error
  WriteResponse(*Response, interface{}) error

  Close() error
}

實際上不用這麼麻煩,咱們查看源碼看看gobServerCodec是怎麼實現的,而後仿造實現一個就好了。下面我實現了一個 JSON 格式的編解碼器:

type JsonServerCodec struct {
  rwc    io.ReadWriteCloser
  dec    *json.Decoder
  enc    *json.Encoder
  encBuf *bufio.Writer
  closed bool
}

func NewJsonServerCodec(conn io.ReadWriteCloser) *JsonServerCodec {
  buf := bufio.NewWriter(conn)
  return &JsonServerCodec{conn, json.NewDecoder(conn), json.NewEncoder(buf), buf, false}
}

func (c *JsonServerCodec) ReadRequestHeader(r *rpc.Request) error {
  return c.dec.Decode(r)
}

func (c *JsonServerCodec) ReadRequestBody(body interface{}) error {
  return c.dec.Decode(body)
}

func (c *JsonServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
  if err = c.enc.Encode(r); err != nil {
    if c.encBuf.Flush() == nil {
      log.Println("rpc: json error encoding response:", err)
      c.Close()
    }
    return
  }
  if err = c.enc.Encode(body); err != nil {
    if c.encBuf.Flush() == nil {
      log.Println("rpc: json error encoding body:", err)
      c.Close()
    }
    return
  }
  return c.encBuf.Flush()
}

func (c *JsonServerCodec) Close() error {
  if c.closed {
    return nil
  }
  c.closed = true
  return c.rwc.Close()
}

func main() {
  l, err := net.Listen("tcp", ":1234")
  if err != nil {
    log.Fatal("listen error:", err)
  }

  arith := new(Arith)
  rpc.Register(arith)

  for {
    conn, err := l.Accept()
    if err != nil {
      log.Fatal("accept error:", err)
    }

    go rpc.ServeCodec(NewJsonServerCodec(conn))
  }
}

for循環中須要建立編解碼器JsonServerCodec傳給ServeCodec方法。一樣的,客戶端要實現rpc.ClientCodec接口,也是仿造gobClientCodec的實現:

type JsonClientCodec struct {
  rwc    io.ReadWriteCloser
  dec    *json.Decoder
  enc    *json.Encoder
  encBuf *bufio.Writer
}

func NewJsonClientCodec(conn io.ReadWriteCloser) *JsonClientCodec {
  encBuf := bufio.NewWriter(conn)
  return &JsonClientCodec{conn, json.NewDecoder(conn), json.NewEncoder(encBuf), encBuf}
}

func (c *JsonClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) {
  if err = c.enc.Encode(r); err != nil {
    return
  }
  if err = c.enc.Encode(body); err != nil {
    return
  }
  return c.encBuf.Flush()
}

func (c *JsonClientCodec) ReadResponseHeader(r *rpc.Response) error {
  return c.dec.Decode(r)
}

func (c *JsonClientCodec) ReadResponseBody(body interface{}) error {
  return c.dec.Decode(body)
}

func (c *JsonClientCodec) Close() error {
  return c.rwc.Close()
}

func main() {
  conn, err := net.Dial("tcp", ":1234")
  if err != nil {
    log.Fatal("dial error:", err)
  }

  client := rpc.NewClientWithCodec(NewJsonClientCodec(conn))

  args := &Args{7, 8}
  var reply int
  err = client.Call("Arith.Multiply", args, &reply)
  if err != nil {
    log.Fatal("Multiply error:", err)
  }
  fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

要使用NewClientWithCodec以指定的編解碼器建立客戶端。

自定義服務器

實際上,上面咱們調用的方法rpc.Registerrpc.RegisterNamerpc.ServeConnrpc.ServeCodec都是轉而去調用默認DefaultServer的相關方法:

// src/net/rpc/server.go
var DefaultServer = NewServer()

func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }

func RegisterName(name string, rcvr interface{}) error {
  return DefaultServer.RegisterName(name, rcvr)
}

func ServeConn(conn io.ReadWriteCloser) {
  DefaultServer.ServeConn(conn)
}

func ServeCodec(codec ServerCodec) {
  DefaultServer.ServeCodec(codec)
}

可是由於DefaultServer是全局共享的,若是有第三方庫使用了相關方法,而且註冊了一些對象的方法,咱們引用這個第三方庫以後,就出現兩個問題。第一,可能與咱們註冊的方法衝突;第二,帶來額外的安全隱患(庫中方法直接panic?)。故而推薦作法是本身NewServer

func main() {
  arith := new(Arith)
  server := rpc.NewServer()
  server.RegisterName("math", arith)
  server.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)

  if err := http.ListenAndServe(":1234", nil); err != nil {
    log.Fatal("serve error:", err)
  }
}

這實際上是一個套路,不少庫會提供一個默認的實現直接使用,如lognet/http這些庫。可是也提供了建立和自定義的方法。通常測試時爲了方即可以使用默認實現,實踐中最好本身建立相應的對象,避免干擾和安全問題。

總結

本文介紹了 Go 標準庫中的rpc,它使用很是簡單,性能異常強大。不少rpc的第三方庫都是對rpc的封裝,早期版本的rpcx就是基於rpc作的封裝。

你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. rpc 官方文檔https://golang.org/pkg/net/rpc/
  2. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

個人博客:https://darjun.github.io

歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~

相關文章
相關標籤/搜索