Go 每日一庫之 rpcx

簡介

在以前的兩篇文章rpcjson-rpc中,咱們介紹了 Go 標準庫提供的rpc實現。在實際開發中,rpc庫的功能仍是有所欠缺。今天咱們介紹一個很是優秀的 Go RPC 庫——rpcxrpcx是一位國人大牛開發的,詳細開發歷程能夠在rpcx官方博客瞭解。rpcx擁有媲美,甚至某種程度上超越gRPC的性能,有完善的中文文檔,提供服務發現和治理的插件。git

快速使用

本文示例使用go modulesgithub

首先是安裝:golang

$ go get -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...
複製代碼

能夠看出rpcx的安裝有點特殊。使用go get -v github.com/smallnest/rpcx/...命令只會安裝rpcx的基礎功能。擴展功能都是經過build tags指定。爲了使用方便,通常安裝全部的tags,如上面命令所示。這也是官方推薦的安裝方式。json

咱們先編寫服務端程序,實際上這個程序與用rpc標準庫編寫的程序幾乎如出一轍:bash

package main

import (
  "context"
  "errors"

  "github.com/smallnest/rpcx/server"
)

type Args struct {
  A, B int
}

type Quotient struct {
  Quo, Rem int
}

type Arith int

func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error {
  *reply = args.A * args.B
  return nil
}

func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error {
  if args.B == 0 {
    return errors.New("divide by 0")
  }

  quo.Quo = args.A / args.B
  quo.Rem = args.A % args.B
  return nil
}

func main() {
  s := server.NewServer()
  s.RegisterName("Arith", new(Arith), "")
  s.Serve("tcp", ":8972")
}
複製代碼

首先建立一個Server對象,調用它的RegisterName()方法在服務路徑Arith下注冊MulDiv方法。與標準庫相比,rpcx要求註冊方法的第一個參數必須爲context.Context類型。最後調用s.Serve("tcp", ":8972")監聽 TCP 端口 8972。是否是很簡單?啓動服務器:服務器

$ go run main.go
複製代碼

而後是客戶端程序:微信

package main

import (
  "context"
  "flag"
  "log"

  "github.com/smallnest/rpcx/client"
)

var (
  addr = flag.String("addr", ":8972", "service address")
)

func main() {
  flag.Parse()

  d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  defer xclient.Close()

  args := &Args{A:10, B:20}
  var reply int

  err :=xclient.Call(context.Background(), "Mul", args, &reply)
  if err != nil {
    log.Fatalf("failed to call: %v", err)
  }

  fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)

  args = &Args{50, 20}
  var quo Quotient
  err = xclient.Call(context.Background(), "Div", args, &quo)
  if err != nil {
    log.Fatalf("failed to call: %v", err)
  }

  fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
複製代碼

rpcx支持多種服務發現的方式讓客戶端找到服務器。上面代碼中咱們使用的是最簡單的點到點的方式,也就是直連。要調用服務端的方法,必須先建立一個Client對象。使用Client對象來調用遠程方法。運行客戶端:網絡

$ go run main.go
10 * 20 = 200
50 * 20 = 2...10
複製代碼

注意到,建立Client對象的參數有client.Failtryclient.RandomSelect。這兩個參數分別爲失敗模式如何選擇服務器dom

傳輸

rpcx支持多種傳輸協議:tcp

  • TCP:TCP 協議,網絡名稱爲tcp
  • HTTP:HTTP 協議,網絡名稱爲http
  • UnixDomain:unix 域協議,網絡名稱爲unix
  • QUIC:是 Quick UDP Internet Connections 的縮寫,意爲快速UDP網絡鏈接。HTTP/3 底層就是 QUIC 協議,Google 出品。網絡名稱爲quic
  • KCP:快速而且可靠的 ARQ 協議,網絡名稱爲kcp

rpcx對這些協議作了很是好的封裝。除了在建立服務器和客戶端鏈接時須要指定協議名稱,其它時候的使用基本是透明的。咱們將上面的例子改裝成使用http協議的:

服務端改動:

s.Serve("http", ":8972")
複製代碼

客戶端改動:

d := client.NewPeer2PeerDiscovery("http@"+*addr, "")
複製代碼

QUICKCP的使用有點特殊,QUIC必須與 TLS 一塊兒使用,KCP也須要作傳輸加密。使用 Go 語言咱們能很方便地生成一個證書和私鑰:

package main

import (
  "crypto/rand"
  "crypto/rsa"
  "crypto/x509"
  "crypto/x509/pkix"
  "encoding/pem"
  "math/big"
  "net"
  "os"
  "time"
)

func main() {
  max := new(big.Int).Lsh(big.NewInt(1), 128)
  serialNumber, _ := rand.Int(rand.Reader, max)
  subject := pkix.Name{
    Organization:       []string{"Go Daily Lib"},
    OrganizationalUnit: []string{"TechBlog"},
    CommonName:         "go daily lib",
  }

  template := x509.Certificate{
    SerialNumber: serialNumber,
    Subject:      subject,
    NotBefore:    time.Now(),
    NotAfter:     time.Now().Add(365 * 24 * time.Hour),
    KeyUsage:     x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
    ExtKeyUsage:  []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
    IPAddresses:  []net.IP{net.ParseIP("127.0.0.1")},
  }

  pk, _ := rsa.GenerateKey(rand.Reader, 2048)

  derBytes, _ := x509.CreateCertificate(rand.Reader, &template, &template, &pk.PublicKey, pk)
  certOut, _ := os.Create("server.pem")
  pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
  certOut.Close()

  keyOut, _ := os.Create("server.key")
  pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(pk)})
  keyOut.Close()
}
複製代碼

上面代碼生成了一個證書和私鑰,有效期爲 1 年。運行程序,獲得兩個文件server.pemserver.key。而後咱們就能夠編寫使用QUIC協議的程序了。服務端:

func main() {
  cert, _ := tls.LoadX509KeyPair("server.pem", "server.key")
  config := &tls.Config{Certificates: []tls.Certificate{cert}}

  s := server.NewServer(server.WithTLSConfig(config))
  s.RegisterName("Arith", new(Arith), "")
  s.Serve("quic", "localhost:8972")
}
複製代碼

實際上就是加載證書和密鑰,而後在建立Server對象時做爲選項傳入。客戶端改動:

conf := &tls.Config{
  InsecureSkipVerify: true,
}

option := client.DefaultOption
option.TLSConfig = conf
d := client.NewPeer2PeerDiscovery("quic@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
defer xclient.Close()
複製代碼

客戶端也須要配置 TLS。

有一點須要注意,rpcxquic/kcp這些協議的支持是經過build tags實現的。默認不會編譯quic/kcp相關文件。若是要使用,必須本身手動指定tags。先啓動服務端程序:

$ go run -tags quic main.go
複製代碼

而後切換到客戶端程序目錄,執行下面命令:

$ go run -tags quic main.go
複製代碼

還有一點須要注意,在使用tcphttp(底層也是tcp)協議的時候,咱們能夠將地址簡寫爲:8972,由於默認就是本地地址。可是quic不行,必須把地址寫完整:

// 服務端
s.Serve("quic", "localhost:8972")
// 客戶端
addr = flag.String("addr", "localhost:8972", "service address")
複製代碼

註冊函數

上面的例子都是調用對象的方法,咱們也能夠調用函數。函數的類型與對象方法相比只是沒有接收者。註冊函數須要指定一個服務路徑。服務端:

type Args struct {
  A, B int
}

type Quotient struct {
  Quo, Rem int
}


func Mul(cxt context.Context, args *Args, reply *int) error {
  *reply = args.A * args.B
  return nil
}

func Div(cxt context.Context, args *Args, quo *Quotient) error {
  if args.B == 0 {
    return errors.New("divide by 0")
  }

  quo.Quo = args.A / args.B
  quo.Rem = args.A % args.B
  return nil
}

func main() {
  s := server.NewServer()
  s.RegisterFunction("function", Mul, "")
  s.RegisterFunction("function", Div, "")
  s.Serve("tcp", ":8972")
}
複製代碼

只是註冊方法由RegisterName變爲了RegisterFunction,參數由一個對象變爲一個函數。咱們須要爲註冊的函數指定一個服務路徑,客戶端調用時會根據這個路徑查找對應方法。客戶端:

func main() {
  flag.Parse()

  d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  xclient := client.NewXClient("function", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  defer xclient.Close()

  args := &Args{A: 10, B: 20}
  var reply int

  err := xclient.Call(context.Background(), "Mul", args, &reply)
  if err != nil {
    log.Fatalf("failed to call: %v", err)
  }

  fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)

  args = &Args{50, 20}
  var quo Quotient
  err = xclient.Call(context.Background(), "Div", args, &quo)
  if err != nil {
    log.Fatalf("failed to call: %v", err)
  }

  fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
複製代碼

註冊中心

rpcx支持多種註冊中心:

  • 點對點:其實就是直連,沒有註冊中心;
  • 點對多:能夠配置多個服務器;
  • zookeeper:經常使用的註冊中心;
  • Etcd:Go 語言編寫的註冊中心;
  • 進程內調用:方便調試功能,在同一個進程內查找服務;
  • Consul/mDNS等。

咱們以前演示的都是點對點的鏈接,接下來咱們介紹如何使用zookeeper做爲註冊中心。在rpcx中,註冊中心是經過插件的方式集成的。使用ZooKeeperRegisterPlugin這個插件來集成Zookeeper。服務端代碼:

type Args struct {
  A, B int
}

type Quotient struct {
  Quo, Rem int
}

var (
  addr     = flag.String("addr", ":8972", "service address")
  zkAddr   = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
  basePath = flag.String("basePath", "/services/math", "service base path")
)

type Arith int

func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error {
  fmt.Println("Mul on", *addr)
  *reply = args.A * args.B
  return nil
}

func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error {
  fmt.Println("Div on", *addr)
  if args.B == 0 {
    return errors.New("divide by 0")
  }

  quo.Quo = args.A / args.B
  quo.Rem = args.A % args.B
  return nil
}

func main() {
  flag.Parse()

  p := &serverplugin.ZooKeeperRegisterPlugin{
    ServiceAddress:   "tcp@" + *addr,
    ZooKeeperServers: []string{*zkAddr},
    BasePath:         *basePath,
    Metrics:          metrics.NewRegistry(),
    UpdateInterval:   time.Minute,
  }
  if err := p.Start(); err != nil {
    log.Fatal(err)
  }

  s := server.NewServer()
  s.Plugins.Add(p)

  s.RegisterName("Arith", new(Arith), "")
  s.Serve("tcp", *addr)
}
複製代碼

ZooKeeperRegisterPlugin中,咱們指定了本服務地址,zookeeper 集羣地址(能夠是多個),起始路徑等。服務器啓動時自動向 zookeeper 註冊本服務的信息,客戶端可直接從 zookeeper 拉取可用的服務列表。

首先啓動 zookeeper 服務器,zookeeper 的安裝與啓動能夠參考個人上一篇文章。分別在 3 個控制檯中啓動 3 個服務器,指定不一樣的端口(注意須要指定-tags zookeeper):

// 控制檯1
$ go run -tags zookeeper main.go -addr 127.0.0.1:8971
// 控制檯2
$ go run -tags zookeeper main.go -addr 127.0.0.1:8972
// 控制檯3
$ go run -tags zookeeper main.go -addr 127.0.0.1:8973
複製代碼

啓動以後,咱們觀察 zookeeper 路徑/services/math中的內容:

很是棒,可用的服務地址不用咱們手動維護了!

接下來是客戶端:

var (
  zkAddr   = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
  basePath = flag.String("basePath", "/services/math", "service base path")
)

func main() {
  flag.Parse()

  d := client.NewZookeeperDiscovery(*basePath, "Arith", []string{*zkAddr}, nil)
  xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  defer xclient.Close()

  args := &Args{A: 10, B: 20}
  var reply int

  err := xclient.Call(context.Background(), "Mul", args, &reply)
  if err != nil {
    log.Fatalf("failed to call: %v", err)
  }

  fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)

  args = &Args{50, 20}
  var quo Quotient
  err = xclient.Call(context.Background(), "Div", args, &quo)
  if err != nil {
    log.Fatalf("failed to call: %v", err)
  }

  fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
複製代碼

咱們經過 zookeeper 讀取可用的Arith服務列表,而後隨機選擇一個服務發送請求:

$ go run -tags zookeeper main.go
2020/05/26 23:03:40 Connected to 127.0.0.1:2181
2020/05/26 23:03:40 authenticated: id=72057658440744975, timeout=10000
2020/05/26 23:03:40 re-submitting `0` credentials after reconnect
10 * 20 = 200
50 * 20 = 2...10
複製代碼

咱們的客戶端發送了兩條請求。因爲使用了client.RandomSelect策略,因此這兩個請求隨機發送到某個服務端。我在MulDiv方法中增長了一個打印,能夠觀察一下各個控制檯的輸出!

若是咱們關閉了某個服務器,對應的服務地址會從 zookeeper 中移除。我關閉了服務器 1,zookeeper 服務列表變爲:

相比上一篇文章中須要手動維護 zookeeper 的內容,rpcx的自動註冊和維護明顯要方便太多了!

總結

rpcx是 Go 語言中數一數二的 rpc 庫,功能豐富,性能出衆,文檔豐富,已經被很多公司和我的採用。本文介紹的只是最基礎的功能,rpcx支持各類路由選擇策略、分組、限流、身份認證等高級功能,推薦深刻學習!

你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. rpcx GitHub:github.com/smallnest/r…
  2. rpcx 博客:blog.rpcx.io/
  3. rpcx 官網:rpcx.io/
  4. rpcx 文檔:doc.rpcx.io/
  5. Go 每日一庫 GitHub:github.com/darjun/go-d…

個人博客:darjun.github.io

歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~

相關文章
相關標籤/搜索