Go 每日一庫之 jsonrpc

簡介

上一篇文章中咱們介紹了 Go 標準庫net/rpc的用法。在默認狀況下,rpc庫內部使用gob格式傳輸數據。咱們仿造gob的編解碼器實現了一個json格式的。實際上標準庫net/rpc/jsonrcp中已有實現。本文是對上一篇文章的補充。git

快速使用

標準庫無需安裝。github

首先是服務端,使用net/rpc/jsonrpc以後,咱們就不用本身去編寫json的編解碼器了:golang

package main

import (
  "log"
  "net"
  "net/rpc"
  "net/rpc/jsonrpc"
)

type Args struct {
  A, B int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
  *reply = args.A * args.B
  return nil
}

func main() {
  l, err := net.Listen("tcp", ":1234")
  if err != nil {
    log.Fatal("listen error:", err)
  }

  arith := new(Arith)
  rpc.Register(arith)

  for {
    conn, err := l.Accept()
    if err != nil {
      log.Fatal("accept error:", err)
    }

    // 注意這一行
    go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
  }
}

直接調用jsonrpc.NewServerCodec(conn)建立一個服務端的codec。客戶端也是相似的:json

func main() {
  conn, err := net.Dial("tcp", ":1234")
  if err != nil {
    log.Fatal("dial error:", err)
  }

  // 這裏,這裏😁
  client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))

  args := &Args{7, 8}
  var reply int
  err = client.Call("Arith.Multiply", args, &reply)
  if err != nil {
    log.Fatal("Multiply error:", err)
  }
  fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

先運行服務端程序:數組

$ go run main.go

而後在一個新的控制檯中運行客戶端程序:服務器

$ go run client.go
Multiply: 7*8=56

下面這段代碼基本上每一個使用jsonrpc的程序都要編寫:微信

conn, err := net.Dial("tcp", ":1234")
if err != nil {
  log.Fatal("dial error:", err)
}

client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))

所以jsonrpc爲了方便直接提供了一個Dial方法。使用Dial簡化上面的客戶端程序:負載均衡

func main() {
  client, err := jsonrpc.Dial("tcp", ":1234")
  if err != nil {
    log.Fatal("dial error:", err)
  }

  args := &Args{7, 8}
  var reply int
  err = client.Call("Arith.Multiply", args, &reply)
  if err != nil {
    log.Fatal("Multiply error:", err)
  }
  fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

效果是同樣的。tcp

JSON-RPC 標準

JSON-RPC 1.0 標準在 2005 年發佈,通過數年演化,於 2010 年發佈了 2.0 版本。JSON-RPC 標準的內容可在https://www.jsonrpc.org/specification查看。Go 標準庫net/rpc/jsonrpc實現了 1.0 版本。關於 2.0 版本的實現能夠在pkg.go.dev上搜索json-rpc+2.0。本文以 1.0 版本爲基礎進行介紹。編輯器

JSON-RPC 傳輸的是單一的對象,序列化爲 JSON 格式。請求對象包含如下 3 個屬性:

  • method:請求調用的方法;
  • params:一個數組表示傳給方法的各個參數;
  • id:請求 ID。ID 能夠是任何類型,在收到響應時根據這個屬性判斷對應哪一個請求。

響應對象包含如下 3 個屬性:

  • result:方法返回的對象,若是error非空時,該屬性必須爲null
  • error:表示調用是否出錯;
  • id:對應請求的 ID。

另外標準還定義了一種通知類型,除了id屬性爲null以外,通知對象的屬性與請求對象徹底同樣。

調用client.Call("echo", "Hello JSON-RPC", &reply)時:

請求:{ "method": "echo", "params": ["Hello JSON-RPC"], "id": 1}
響應:{ "result": "Hello JSON-RPC", "error": null, "id": 1}

使用 zookeeper 實現簡單的負載均衡

下面咱們使用zookeeper實現一個簡單的客戶端側的負載均衡。zookeeper中記錄全部的可提供服務的服務器,客戶端每次請求時都隨機挑選一個。咱們的示例中,請求必須是無狀態的。首先,咱們改造一下服務端程序,將監聽地址提取出來,經過flag指定:

package main

import (
  "flag"
  "log"
  "net"
  "net/rpc"
  "net/rpc/jsonrpc"
)

var (
  addr *string
)

type Args struct {
  A, B int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
  *reply = args.A * args.B
  return nil
}

func init() {
  addr = flag.String("addr", ":1111", "addr to listen")
}

func main() {
  flag.Parse()

  l, err := net.Listen("tcp", *addr)
  if err != nil {
    log.Fatal("listen error:", err)
  }

  arith := new(Arith)
  rpc.Register(arith)

  for {
    conn, err := l.Accept()
    if err != nil {
      log.Fatal("accept error:", err)
    }

    go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
  }
}

關於有哪些服務器可用,咱們存儲在zookeeper中。

首先要啓動一個zookeeper的程序。在 Apache Zookeeper 官網能夠下載能直接運行的 Windows 程序。下載以後解壓,將conf文件夾中的樣板配置zoo_sample.cfg複製一份,文件名改成zoo.cfg。在編輯器中打開zoo.cfg,將dataDir改成一個已存在的目錄,或建立一個新目錄。我在bin同級目錄中建立了一個data目錄,而後設置dataDir=../data。切換到bin目錄下執行zkServer.batzookeeper程序就運行起來了。使用zkClient.bat鏈接上這個zookeeper,增長一個節點,設置數據:

$ create /rpcserver
$ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112,127.0.0.1:1113

咱們用,分隔多個服務器地址。

準備工做完成後,接下來就開始編寫客戶端代碼了。咱們實現一個代理類,負責監聽zookeeper的數據變化,根據zookeeper中新的地址建立到服務器的鏈接,刪除老的鏈接,將調用請求隨機轉發到一個服務器處理:

type Proxy struct {
  zookeeper     string
  clients       map[string]*rpc.Client
  events        <-chan zk.Event
  zookeeperConn *zk.Conn
  mutex         sync.Mutex
}

func NewProxy(addr string) *Proxy {
  return &Proxy{
    zookeeper: addr,
    clients:   make(map[string]*rpc.Client),
  }
}

這裏咱們使用了go-zookeeper這個庫,須要額外安裝:

$ go get github.com/samuel/go-zookeeper/zk

程序啓動時,代理對象從zookeeper中獲取服務端地址,建立鏈接:

func (p *Proxy) Connect() {
  c, _, err := zk.Connect([]string{p.zookeeper}, time.Second) //*10)
  if err != nil {
    panic(err)
  }

  data, _, event, err := c.GetW("/rpcserver")
  if err != nil {
    panic(err)
  }

  p.events = event
  p.zookeeperConn = c

  p.CreateClients(string(data))
}

func (p *Proxy) CreateClients(server string) {
  p.mutex.Lock()
  defer p.mutex.Unlock()

  addrs := strings.Split(server, ",")
  allAddr := make(map[string]struct{})
  for _, addr := range addrs {
    allAddr[addr] = struct{}{}
    if _, exist := p.clients[addr]; exist {
      continue
    }

    client, err := jsonrpc.Dial("tcp", addr)
    if err != nil {
      log.Println("jsonrpc Dial error:", err)
      continue
    }

    p.clients[addr] = client
    log.Println("new addr:", addr)
  }

  for addr := range p.clients {
    if _, exist := allAddr[addr]; !exist {
      // 不在 zookeeper 中的地址,刪除對應鏈接
      oldClient.Close()
      delete(p.clients, addr)

      log.Println("delete addr", addr)
    }
  }
}

同時,須要監聽zookeeper中的數據變化,當新增或刪除某個服務端地址時,Proxy要及時更新鏈接:

func (p *Proxy) Run() {
  for {
    select {
    case event := <-p.events:
      if event.Type == zk.EventNodeDataChanged {
        data, _, err := p.zookeeperConn.Get("/rpcserver")
        if err != nil {
          log.Println("get zookeeper data failed:", err)
          continue
        }

        p.CreateClients(string(data))
      }
    }
  }
}

客戶端主體程序使用Proxy結構很是方便:

package main

import (
  "flag"
  "fmt"
  "math/rand"
)

var (
  zookeeperAddr *string
)

func init() {
  zookeeperAddr = flag.String("addr", ":2181", "zookeeper address")
}

type Args struct {
  A, B int
}

func main() {
  flag.Parse()

  fmt.Println(*zookeeperAddr)
  p := NewProxy(*zookeeperAddr)
  p.Connect()

  go p.Run()

  for i := 0; i < 10; i++ {
    var reply int
    args := &Args{rand.Intn(1000), rand.Intn(1000)}
    p.Call("Arith.Multiply", args, &reply)
    fmt.Printf("%d*%d=%d\n", args.A, args.B, reply)
  }

  // sleep 過程當中能夠修改 zookeeper 中的數據
  time.Sleep(1 * time.Minute)

  // 使用新的地址作隨機
  for i := 0; i < 100; i++ {
    var reply int
    args := &Args{rand.Intn(1000), rand.Intn(1000)}
    p.Call("Arith.Multiply", args, &reply)
    fmt.Printf("%d*%d=%d\n", args.A, args.B, reply)
  }
}

建立一個代理對象,在一個新的 goroutine 中監聽zookeeper事件。而後經過ProxyCall調用遠程服務端的方法:

func (p *Proxy) Call(method string, args interface{}, reply interface{}) error {
  var client *rpc.Client
  var addr string
  idx := rand.Int31n(int32(len(p.clients)))
  var i int32
  p.mutex.Lock()
  for a, c := range p.clients {
    if i == idx {
      client = c
      addr = a
      break
    }
    i++
  }
  p.mutex.Unlock()

  fmt.Println("use", addr)
  return client.Call(method, args, reply)
}

首先咱們要啓動 3 個服務端程序,分別監聽端口 11十一、11十二、1113,須要 3 個控制檯:

控制檯 1:

$ go run main.go -addr :1111

控制檯 2:

$ go run main.go -addr :1112

控制檯 3:

$ go run main.go -addr :1113

客戶端在一個新的控制檯啓動,指定zookeeper地址:

$ go run . -addr=127.0.0.1:2181

在輸出中,咱們能夠看到是怎麼隨機挑選服務器的。

咱們能夠嘗試在客戶端程序運行的過程當中,將某個服務器地址從zookeeper中刪除。我特地在程序中加了一個 1 分鐘的延遲。在sleep過程當中,經過zkClient.cmd127.0.0.1:1113這個地址從zookeeper中刪除:

$ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112

控制檯輸出:

$ 2020/05/10 23:47:47 delete addr 127.0.0.1:1113

而且後續的請求不會再發到127.0.0.1:1113這個服務器了。

其實,在實際的項目中,Proxy通常是一個獨立的服務器,而不是放在客戶端側。上面示例這樣處理只是爲了方便。

總結

RPC 底層可使用各類協議傳輸數據,JSON/XML/Protobuf 均可以。對 rpc 感興趣的建議看看rpcx這個庫,https://github.com/smallnest/rpcx。很是強大!

你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. jsonrpc GitHub:https://golang.org/pkg/net/rpc/jsonrpc/
  2. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

個人博客:https://darjun.github.io

歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~

相關文章
相關標籤/搜索