在上一篇文章中咱們介紹了 Go 標準庫net/rpc
的用法。在默認狀況下,rpc
庫內部使用gob
格式傳輸數據。咱們仿造gob
的編解碼器實現了一個json
格式的。實際上標準庫net/rpc/jsonrcp
中已有實現。本文是對上一篇文章的補充。git
標準庫無需安裝。github
首先是服務端,使用net/rpc/jsonrpc
以後,咱們就不用本身去編寫json
的編解碼器了:golang
package main import ( "log" "net" "net/rpc" "net/rpc/jsonrpc" ) type Args struct { A, B int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error { *reply = args.A * args.B return nil } func main() { l, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) for { conn, err := l.Accept() if err != nil { log.Fatal("accept error:", err) } // 注意這一行 go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) } }
直接調用jsonrpc.NewServerCodec(conn)
建立一個服務端的codec
。客戶端也是相似的:json
func main() { conn, err := net.Dial("tcp", ":1234") if err != nil { log.Fatal("dial error:", err) } // 這裏,這裏😁 client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn)) args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
先運行服務端程序:數組
$ go run main.go
而後在一個新的控制檯中運行客戶端程序:服務器
$ go run client.go Multiply: 7*8=56
下面這段代碼基本上每一個使用jsonrpc
的程序都要編寫:微信
conn, err := net.Dial("tcp", ":1234") if err != nil { log.Fatal("dial error:", err) } client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
所以jsonrpc
爲了方便直接提供了一個Dial
方法。使用Dial
簡化上面的客戶端程序:負載均衡
func main() { client, err := jsonrpc.Dial("tcp", ":1234") if err != nil { log.Fatal("dial error:", err) } args := &Args{7, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { log.Fatal("Multiply error:", err) } fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply) }
效果是同樣的。tcp
JSON-RPC 1.0 標準在 2005 年發佈,通過數年演化,於 2010 年發佈了 2.0 版本。JSON-RPC 標準的內容可在https://www.jsonrpc.org/specification查看。Go 標準庫net/rpc/jsonrpc
實現了 1.0 版本。關於 2.0 版本的實現能夠在pkg.go.dev
上搜索json-rpc+2.0
。本文以 1.0 版本爲基礎進行介紹。編輯器
JSON-RPC 傳輸的是單一的對象,序列化爲 JSON 格式。請求對象包含如下 3 個屬性:
method
:請求調用的方法;params
:一個數組表示傳給方法的各個參數;id
:請求 ID。ID 能夠是任何類型,在收到響應時根據這個屬性判斷對應哪一個請求。響應對象包含如下 3 個屬性:
result
:方法返回的對象,若是error
非空時,該屬性必須爲null
;error
:表示調用是否出錯;id
:對應請求的 ID。另外標準還定義了一種通知類型,除了id
屬性爲null
以外,通知對象的屬性與請求對象徹底同樣。
調用client.Call("echo", "Hello JSON-RPC", &reply)
時:
請求:{ "method": "echo", "params": ["Hello JSON-RPC"], "id": 1} 響應:{ "result": "Hello JSON-RPC", "error": null, "id": 1}
下面咱們使用zookeeper
實現一個簡單的客戶端側的負載均衡。zookeeper
中記錄全部的可提供服務的服務器,客戶端每次請求時都隨機挑選一個。咱們的示例中,請求必須是無狀態的。首先,咱們改造一下服務端程序,將監聽地址提取出來,經過flag
指定:
package main import ( "flag" "log" "net" "net/rpc" "net/rpc/jsonrpc" ) var ( addr *string ) type Args struct { A, B int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error { *reply = args.A * args.B return nil } func init() { addr = flag.String("addr", ":1111", "addr to listen") } func main() { flag.Parse() l, err := net.Listen("tcp", *addr) if err != nil { log.Fatal("listen error:", err) } arith := new(Arith) rpc.Register(arith) for { conn, err := l.Accept() if err != nil { log.Fatal("accept error:", err) } go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) } }
關於有哪些服務器可用,咱們存儲在zookeeper
中。
首先要啓動一個zookeeper
的程序。在 Apache Zookeeper 官網能夠下載能直接運行的 Windows 程序。下載以後解壓,將conf
文件夾中的樣板配置zoo_sample.cfg
複製一份,文件名改成zoo.cfg
。在編輯器中打開zoo.cfg
,將dataDir
改成一個已存在的目錄,或建立一個新目錄。我在bin
同級目錄中建立了一個data
目錄,而後設置dataDir=../data
。切換到bin
目錄下執行zkServer.bat
,zookeeper
程序就運行起來了。使用zkClient.bat
鏈接上這個zookeeper
,增長一個節點,設置數據:
$ create /rpcserver $ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112,127.0.0.1:1113
咱們用,
分隔多個服務器地址。
準備工做完成後,接下來就開始編寫客戶端代碼了。咱們實現一個代理類,負責監聽zookeeper
的數據變化,根據zookeeper
中新的地址建立到服務器的鏈接,刪除老的鏈接,將調用請求隨機轉發到一個服務器處理:
type Proxy struct { zookeeper string clients map[string]*rpc.Client events <-chan zk.Event zookeeperConn *zk.Conn mutex sync.Mutex } func NewProxy(addr string) *Proxy { return &Proxy{ zookeeper: addr, clients: make(map[string]*rpc.Client), } }
這裏咱們使用了go-zookeeper
這個庫,須要額外安裝:
$ go get github.com/samuel/go-zookeeper/zk
程序啓動時,代理對象從zookeeper
中獲取服務端地址,建立鏈接:
func (p *Proxy) Connect() { c, _, err := zk.Connect([]string{p.zookeeper}, time.Second) //*10) if err != nil { panic(err) } data, _, event, err := c.GetW("/rpcserver") if err != nil { panic(err) } p.events = event p.zookeeperConn = c p.CreateClients(string(data)) } func (p *Proxy) CreateClients(server string) { p.mutex.Lock() defer p.mutex.Unlock() addrs := strings.Split(server, ",") allAddr := make(map[string]struct{}) for _, addr := range addrs { allAddr[addr] = struct{}{} if _, exist := p.clients[addr]; exist { continue } client, err := jsonrpc.Dial("tcp", addr) if err != nil { log.Println("jsonrpc Dial error:", err) continue } p.clients[addr] = client log.Println("new addr:", addr) } for addr := range p.clients { if _, exist := allAddr[addr]; !exist { // 不在 zookeeper 中的地址,刪除對應鏈接 oldClient.Close() delete(p.clients, addr) log.Println("delete addr", addr) } } }
同時,須要監聽zookeeper
中的數據變化,當新增或刪除某個服務端地址時,Proxy
要及時更新鏈接:
func (p *Proxy) Run() { for { select { case event := <-p.events: if event.Type == zk.EventNodeDataChanged { data, _, err := p.zookeeperConn.Get("/rpcserver") if err != nil { log.Println("get zookeeper data failed:", err) continue } p.CreateClients(string(data)) } } } }
客戶端主體程序使用Proxy
結構很是方便:
package main import ( "flag" "fmt" "math/rand" ) var ( zookeeperAddr *string ) func init() { zookeeperAddr = flag.String("addr", ":2181", "zookeeper address") } type Args struct { A, B int } func main() { flag.Parse() fmt.Println(*zookeeperAddr) p := NewProxy(*zookeeperAddr) p.Connect() go p.Run() for i := 0; i < 10; i++ { var reply int args := &Args{rand.Intn(1000), rand.Intn(1000)} p.Call("Arith.Multiply", args, &reply) fmt.Printf("%d*%d=%d\n", args.A, args.B, reply) } // sleep 過程當中能夠修改 zookeeper 中的數據 time.Sleep(1 * time.Minute) // 使用新的地址作隨機 for i := 0; i < 100; i++ { var reply int args := &Args{rand.Intn(1000), rand.Intn(1000)} p.Call("Arith.Multiply", args, &reply) fmt.Printf("%d*%d=%d\n", args.A, args.B, reply) } }
建立一個代理對象,在一個新的 goroutine 中監聽zookeeper
事件。而後經過Proxy
的Call
調用遠程服務端的方法:
func (p *Proxy) Call(method string, args interface{}, reply interface{}) error { var client *rpc.Client var addr string idx := rand.Int31n(int32(len(p.clients))) var i int32 p.mutex.Lock() for a, c := range p.clients { if i == idx { client = c addr = a break } i++ } p.mutex.Unlock() fmt.Println("use", addr) return client.Call(method, args, reply) }
首先咱們要啓動 3 個服務端程序,分別監聽端口 11十一、11十二、1113,須要 3 個控制檯:
控制檯 1:
$ go run main.go -addr :1111
控制檯 2:
$ go run main.go -addr :1112
控制檯 3:
$ go run main.go -addr :1113
客戶端在一個新的控制檯啓動,指定zookeeper
地址:
$ go run . -addr=127.0.0.1:2181
在輸出中,咱們能夠看到是怎麼隨機挑選服務器的。
咱們能夠嘗試在客戶端程序運行的過程當中,將某個服務器地址從zookeeper
中刪除。我特地在程序中加了一個 1 分鐘的延遲。在sleep
過程當中,經過zkClient.cmd
將127.0.0.1:1113
這個地址從zookeeper
中刪除:
$ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112
控制檯輸出:
$ 2020/05/10 23:47:47 delete addr 127.0.0.1:1113
而且後續的請求不會再發到127.0.0.1:1113
這個服務器了。
其實,在實際的項目中,Proxy
通常是一個獨立的服務器,而不是放在客戶端側。上面示例這樣處理只是爲了方便。
RPC 底層可使用各類協議傳輸數據,JSON/XML/Protobuf 均可以。對 rpc 感興趣的建議看看rpcx
這個庫,https://github.com/smallnest/rpcx。很是強大!
你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~