在以前的兩篇文章rpc
和json-rpc
中,咱們介紹了 Go 標準庫提供的rpc
實現。在實際開發中,rpc
庫的功能仍是有所欠缺。今天咱們介紹一個很是優秀的 Go RPC 庫——rpcx
。rpcx
是一位國人大牛開發的,詳細開發歷程能夠在rpcx
官方博客瞭解。rpcx
擁有媲美,甚至某種程度上超越gRPC
的性能,有完善的中文文檔,提供服務發現和治理的插件。git
本文示例使用go modules
。github
首先是安裝:golang
$ go get -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...
複製代碼
能夠看出rpcx
的安裝有點特殊。使用go get -v github.com/smallnest/rpcx/...
命令只會安裝rpcx
的基礎功能。擴展功能都是經過build tags
指定。爲了使用方便,通常安裝全部的tags
,如上面命令所示。這也是官方推薦的安裝方式。json
咱們先編寫服務端程序,實際上這個程序與用rpc
標準庫編寫的程序幾乎如出一轍:bash
package main
import (
"context"
"errors"
"github.com/smallnest/rpcx/server"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by 0")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
s := server.NewServer()
s.RegisterName("Arith", new(Arith), "")
s.Serve("tcp", ":8972")
}
複製代碼
首先建立一個Server
對象,調用它的RegisterName()
方法在服務路徑Arith
下注冊Mul
和Div
方法。與標準庫相比,rpcx
要求註冊方法的第一個參數必須爲context.Context
類型。最後調用s.Serve("tcp", ":8972")
監聽 TCP 端口 8972。是否是很簡單?啓動服務器:服務器
$ go run main.go
複製代碼
而後是客戶端程序:微信
package main
import (
"context"
"flag"
"log"
"github.com/smallnest/rpcx/client"
)
var (
addr = flag.String("addr", ":8972", "service address")
)
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &Args{A:10, B:20}
var reply int
err :=xclient.Call(context.Background(), "Mul", args, &reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
args = &Args{50, 20}
var quo Quotient
err = xclient.Call(context.Background(), "Div", args, &quo)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
複製代碼
rpcx
支持多種服務發現的方式讓客戶端找到服務器。上面代碼中咱們使用的是最簡單的點到點的方式,也就是直連。要調用服務端的方法,必須先建立一個Client
對象。使用Client
對象來調用遠程方法。運行客戶端:網絡
$ go run main.go
10 * 20 = 200
50 * 20 = 2...10
複製代碼
注意到,建立Client
對象的參數有client.Failtry
和client.RandomSelect
。這兩個參數分別爲失敗模式和如何選擇服務器。dom
rpcx
支持多種傳輸協議:tcp
TCP
:TCP 協議,網絡名稱爲tcp
;HTTP
:HTTP 協議,網絡名稱爲http
;UnixDomain
:unix 域協議,網絡名稱爲unix
;QUIC
:是 Quick UDP Internet Connections 的縮寫,意爲快速UDP網絡鏈接。HTTP/3 底層就是 QUIC 協議,Google 出品。網絡名稱爲quic
;KCP
:快速而且可靠的 ARQ 協議,網絡名稱爲kcp
。rpcx
對這些協議作了很是好的封裝。除了在建立服務器和客戶端鏈接時須要指定協議名稱,其它時候的使用基本是透明的。咱們將上面的例子改裝成使用http
協議的:
服務端改動:
s.Serve("http", ":8972")
複製代碼
客戶端改動:
d := client.NewPeer2PeerDiscovery("http@"+*addr, "")
複製代碼
QUIC
和KCP
的使用有點特殊,QUIC
必須與 TLS 一塊兒使用,KCP
也須要作傳輸加密。使用 Go 語言咱們能很方便地生成一個證書和私鑰:
package main
import (
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"net"
"os"
"time"
)
func main() {
max := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, _ := rand.Int(rand.Reader, max)
subject := pkix.Name{
Organization: []string{"Go Daily Lib"},
OrganizationalUnit: []string{"TechBlog"},
CommonName: "go daily lib",
}
template := x509.Certificate{
SerialNumber: serialNumber,
Subject: subject,
NotBefore: time.Now(),
NotAfter: time.Now().Add(365 * 24 * time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
}
pk, _ := rsa.GenerateKey(rand.Reader, 2048)
derBytes, _ := x509.CreateCertificate(rand.Reader, &template, &template, &pk.PublicKey, pk)
certOut, _ := os.Create("server.pem")
pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
certOut.Close()
keyOut, _ := os.Create("server.key")
pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(pk)})
keyOut.Close()
}
複製代碼
上面代碼生成了一個證書和私鑰,有效期爲 1 年。運行程序,獲得兩個文件server.pem
和server.key
。而後咱們就能夠編寫使用QUIC
協議的程序了。服務端:
func main() {
cert, _ := tls.LoadX509KeyPair("server.pem", "server.key")
config := &tls.Config{Certificates: []tls.Certificate{cert}}
s := server.NewServer(server.WithTLSConfig(config))
s.RegisterName("Arith", new(Arith), "")
s.Serve("quic", "localhost:8972")
}
複製代碼
實際上就是加載證書和密鑰,而後在建立Server
對象時做爲選項傳入。客戶端改動:
conf := &tls.Config{
InsecureSkipVerify: true,
}
option := client.DefaultOption
option.TLSConfig = conf
d := client.NewPeer2PeerDiscovery("quic@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
defer xclient.Close()
複製代碼
客戶端也須要配置 TLS。
有一點須要注意,rpcx
對quic/kcp
這些協議的支持是經過build tags
實現的。默認不會編譯quic/kcp
相關文件。若是要使用,必須本身手動指定tags
。先啓動服務端程序:
$ go run -tags quic main.go
複製代碼
而後切換到客戶端程序目錄,執行下面命令:
$ go run -tags quic main.go
複製代碼
還有一點須要注意,在使用tcp
和http
(底層也是tcp
)協議的時候,咱們能夠將地址簡寫爲:8972
,由於默認就是本地地址。可是quic
不行,必須把地址寫完整:
// 服務端
s.Serve("quic", "localhost:8972")
// 客戶端
addr = flag.String("addr", "localhost:8972", "service address")
複製代碼
上面的例子都是調用對象的方法,咱們也能夠調用函數。函數的類型與對象方法相比只是沒有接收者。註冊函數須要指定一個服務路徑。服務端:
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func Mul(cxt context.Context, args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func Div(cxt context.Context, args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by 0")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
s := server.NewServer()
s.RegisterFunction("function", Mul, "")
s.RegisterFunction("function", Div, "")
s.Serve("tcp", ":8972")
}
複製代碼
只是註冊方法由RegisterName
變爲了RegisterFunction
,參數由一個對象變爲一個函數。咱們須要爲註冊的函數指定一個服務路徑,客戶端調用時會根據這個路徑查找對應方法。客戶端:
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("function", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &Args{A: 10, B: 20}
var reply int
err := xclient.Call(context.Background(), "Mul", args, &reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
args = &Args{50, 20}
var quo Quotient
err = xclient.Call(context.Background(), "Div", args, &quo)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
複製代碼
rpcx
支持多種註冊中心:
zookeeper
:經常使用的註冊中心;Etcd
:Go 語言編寫的註冊中心;Consul/mDNS
等。咱們以前演示的都是點對點的鏈接,接下來咱們介紹如何使用zookeeper
做爲註冊中心。在rpcx
中,註冊中心是經過插件的方式集成的。使用ZooKeeperRegisterPlugin
這個插件來集成Zookeeper
。服務端代碼:
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
var (
addr = flag.String("addr", ":8972", "service address")
zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
basePath = flag.String("basePath", "/services/math", "service base path")
)
type Arith int
func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error {
fmt.Println("Mul on", *addr)
*reply = args.A * args.B
return nil
}
func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error {
fmt.Println("Div on", *addr)
if args.B == 0 {
return errors.New("divide by 0")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func main() {
flag.Parse()
p := &serverplugin.ZooKeeperRegisterPlugin{
ServiceAddress: "tcp@" + *addr,
ZooKeeperServers: []string{*zkAddr},
BasePath: *basePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
if err := p.Start(); err != nil {
log.Fatal(err)
}
s := server.NewServer()
s.Plugins.Add(p)
s.RegisterName("Arith", new(Arith), "")
s.Serve("tcp", *addr)
}
複製代碼
在ZooKeeperRegisterPlugin
中,咱們指定了本服務地址,zookeeper 集羣地址(能夠是多個),起始路徑等。服務器啓動時自動向 zookeeper 註冊本服務的信息,客戶端可直接從 zookeeper 拉取可用的服務列表。
首先啓動 zookeeper 服務器,zookeeper 的安裝與啓動能夠參考個人上一篇文章。分別在 3 個控制檯中啓動 3 個服務器,指定不一樣的端口(注意須要指定-tags zookeeper
):
// 控制檯1
$ go run -tags zookeeper main.go -addr 127.0.0.1:8971
// 控制檯2
$ go run -tags zookeeper main.go -addr 127.0.0.1:8972
// 控制檯3
$ go run -tags zookeeper main.go -addr 127.0.0.1:8973
複製代碼
啓動以後,咱們觀察 zookeeper 路徑/services/math
中的內容:
很是棒,可用的服務地址不用咱們手動維護了!
接下來是客戶端:
var (
zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
basePath = flag.String("basePath", "/services/math", "service base path")
)
func main() {
flag.Parse()
d := client.NewZookeeperDiscovery(*basePath, "Arith", []string{*zkAddr}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &Args{A: 10, B: 20}
var reply int
err := xclient.Call(context.Background(), "Mul", args, &reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
args = &Args{50, 20}
var quo Quotient
err = xclient.Call(context.Background(), "Div", args, &quo)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}
複製代碼
咱們經過 zookeeper 讀取可用的Arith
服務列表,而後隨機選擇一個服務發送請求:
$ go run -tags zookeeper main.go
2020/05/26 23:03:40 Connected to 127.0.0.1:2181
2020/05/26 23:03:40 authenticated: id=72057658440744975, timeout=10000
2020/05/26 23:03:40 re-submitting `0` credentials after reconnect
10 * 20 = 200
50 * 20 = 2...10
複製代碼
咱們的客戶端發送了兩條請求。因爲使用了client.RandomSelect
策略,因此這兩個請求隨機發送到某個服務端。我在Mul
和Div
方法中增長了一個打印,能夠觀察一下各個控制檯的輸出!
若是咱們關閉了某個服務器,對應的服務地址會從 zookeeper 中移除。我關閉了服務器 1,zookeeper 服務列表變爲:
相比上一篇文章中須要手動維護 zookeeper 的內容,rpcx
的自動註冊和維護明顯要方便太多了!
rpcx
是 Go 語言中數一數二的 rpc 庫,功能豐富,性能出衆,文檔豐富,已經被很多公司和我的採用。本文介紹的只是最基礎的功能,rpcx
支持各類路由選擇策略、分組、限流、身份認證等高級功能,推薦深刻學習!
你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
個人博客:darjun.github.io
歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~