Go gRPC流模式的實現和TLS加密通訊

gRPC主要有4種請求和響應模式,分別是簡單模式(Simple RPC)服務端流式(Server-side streaming RPC)客戶端流式(Client-side streaming RPC)、和雙向流式(Bidirectional streaming RPC)html

1.簡單模式(Simple RPC):客戶端發起請求並等待服務端響應。git

2.服務端流式(Server-side streaming RPC):客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,直到裏面沒有任何消息。 場景:.客戶端要獲取某原油股的實時走勢,客戶端發送一個請求, 服務端實時返回該股票的走勢github

3.客戶端流式(Client-side streaming RPC):與服務端數據流模式相反,此次是客戶端源源不斷的向服務端發送數據流,而在發送結束後,由服務端返回一個響應。情景模擬:客戶端大量數據上傳到服務端golang

4.雙向流式(Bidirectional streaming RPC):雙方使用讀寫流去發送一個消息序列,兩個流獨立操做,雙方能夠同時發送和同時接收。 情景模擬:雙方對話(能夠一問一答、一問多答、多問一答,形式靈活)api

從上面的定義不難看出,用stream能夠定義一個流式消息。下面咱們就經過實例來演示一下流式通訊的使用方法。服務器

1.首先/api/hello.proto [以下], 而且生成新的api/hello.pb.go代碼。app

syntax = "proto3";
 
package api;
// Any消息類型容許您將消息做爲嵌入類型,而不須要它們 .proto定義。Any包含任意序列化的消息(字節),以及一個URL,該URL充當該消息的全局惟一標識符並解析爲該消息的類型。要使用Any類型,你須要導入google/protobuf/any.proto.
import "google/protobuf/any.proto";
 
message HelloRequest {
  string greeting = 1;
  map<string, string> infos  = 2;
}
 
message HelloResponse {
  string reply = 1;
  repeated google.protobuf.Any details = 2;
}
 
service HelloService {
  rpc SayHello(HelloRequest) returns (HelloResponse){}
  rpc ListHello(HelloRequest) returns (stream HelloResponse) {}
  rpc SayMoreHello(stream HelloRequest) returns (HelloResponse) {}
  rpc SayHelloChat(stream HelloRequest) returns (stream HelloRequest) {}
}
 
message Hello {
    string msg = 1;
}
 
message Error {
    repeated string msg = 1;
}

2.編譯指令:tcp

protoc -ID:\Go\include -I. --go_out=plugins=grpc:. ./api/api.proto

3.在生成的代碼api.hello.go中,咱們能夠看到客戶端接口以下:ide

type HelloServiceServer interface {
    SayHello(context.Context, *HelloRequest) (*HelloResponse, error)
    ListHello(*HelloRequest, HelloService_ListHelloServer) error
    SayMoreHello(HelloService_SayMoreHelloServer) error
    SayHelloChat(HelloService_SayHelloChatServer) error
}

4.接口的實現 server/service/service.go以下:工具

package service
 
import (
    "context"
    "fmt"
    "io"
    "log"
    "time"
 
    "github.com/golang/protobuf/ptypes"
    "github.com/golang/protobuf/ptypes/any"
 
    api "gogrpcstream/api"
)
 
type SayHelloServer struct{}
 
func (s *SayHelloServer) SayHello(ctx context.Context, in *api.HelloRequest) (res *api.HelloResponse, err error) {
    log.Printf("Client Greeting:%s", in.Greeting)
    log.Printf("Client Info:%v", in.Infos)
 
    var an *any.Any
    if in.Infos["hello"] == "world" {
        an, err = ptypes.MarshalAny(&api.Hello{Msg: "Good Request"})
    } else {
        an, err = ptypes.MarshalAny(&api.Error{Msg: []string{"Bad Request", "Wrong Info Msg"}})
    }
 
    if err != nil {
        return
    }
    return &api.HelloResponse{
        Reply:   "Hello World !!",
        Details: []*any.Any{an},
    }, nil
}
 
// 服務器端流式 RPC, 接收一次客戶端請求,返回一個流
func (s *SayHelloServer) ListHello(in *api.HelloRequest, stream api.HelloService_ListHelloServer) error {
    log.Printf("Client Say: %v", in.Greeting)
 
    stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 1"})
    time.Sleep(1 * time.Second)
    stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 2"})
    time.Sleep(1 * time.Second)
    stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 3"})
    time.Sleep(1 * time.Second)
    return nil
}
 
// 客戶端流式 RPC, 客戶端流式請求,服務器可返回一次
func (s *SayHelloServer) SayMoreHello(stream api.HelloService_SayMoreHelloServer) error {
    // 接受客戶端請求
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
 
        if err != nil {
            return err
        }
 
        log.Printf("SayMoreHello Client Say: %v", req.Greeting)
    }
 
    // 流讀取完成後,返回
    return stream.SendAndClose(&api.HelloResponse{Reply: "SayMoreHello Recv Muti Greeting"})
}
 
//雙向
func (s *SayHelloServer) SayHelloChat(stream api.HelloService_SayHelloChatServer) error {
    n := 1
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
 
        if err != nil {
            return err
        }
        err = stream.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Server Say Hello %d", n)})
        if err != nil {
            return err
        }
        n++
        log.Printf("SayHelloChat Client Say: %v", req.Greeting)
    }
    return nil
}

5. server/main.go 服務端實現:

package main
 
import (
    "crypto/tls"
    "crypto/x509"
    "io/ioutil"
    "log"
    "net"
 
    "google.golang.org/grpc/credentials"
 
    "google.golang.org/grpc"
 
    api "gogrpcstream/api"
    sv "gogrpcstream/server/service"
)
 
func main() {
    lis, err := net.Listen("tcp", ":8080")
    if err != nil {
        panic(err)
    }
 
    // 加載證書和密鑰 (同時能驗證證書與私鑰是否匹配)
    cert, err := tls.LoadX509KeyPair("../certs/server.pem", "../certs/server.key")
    if err != nil {
        panic(err)
    }
 
    // 將根證書加入證書詞
    // 測試證書的根若是不加入可信池,那麼測試證書將視爲不惋惜,沒法經過驗證。
    certPool := x509.NewCertPool()
    rootBuf, err := ioutil.ReadFile("../certs/ca.pem")
    if err != nil {
        panic(err)
    }
 
    if !certPool.AppendCertsFromPEM(rootBuf) {
        panic("fail to append test ca")
    }
 
    tlsConf := &tls.Config{
        ClientAuth:   tls.RequireAndVerifyClientCert,
        Certificates: []tls.Certificate{cert},
        ClientCAs:    certPool,
    }
 
    serverOpt := grpc.Creds(credentials.NewTLS(tlsConf))
    grpcServer := grpc.NewServer(serverOpt)
 
    api.RegisterHelloServiceServer(grpcServer, &sv.SayHelloServer{})
 
    log.Println("Server Start...")
    grpcServer.Serve(lis)
}

6.客服端實現:client/main.go

package main
 
import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io"
    "io/ioutil"
    "log"
 
    api "gogrpcstream/api"
 
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
)
 
func main() {
    cert, err := tls.LoadX509KeyPair("../certs/client.pem", "../certs/client.key")
    if err != nil {
        panic(err)
    }
 
    // 將根證書加入證書池
    certPool := x509.NewCertPool()
    bs, err := ioutil.ReadFile("../certs/ca.pem")
    if err != nil {
        panic(err)
    }
 
    if !certPool.AppendCertsFromPEM(bs) {
        panic("cc")
    }
 
    // 新建憑證
    transportCreds := credentials.NewTLS(&tls.Config{
        ServerName:   "localhost",
        Certificates: []tls.Certificate{cert},
        RootCAs:      certPool,
    })
 
    dialOpt := grpc.WithTransportCredentials(transportCreds)
 
    conn, err := grpc.Dial("localhost:8080", dialOpt)
    if err != nil {
        log.Fatalf("Dial failed:%v", err)
    }
    defer conn.Close()
 
    client := api.NewHelloServiceClient(conn)
    resp1, err := client.SayHello(context.Background(), &api.HelloRequest{
        Greeting: "Hello Server 1 !!",
        Infos:    map[string]string{"hello": "world"},
    })
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("SayHello Resp1:%+v", resp1)
 
    resp2, err := client.SayHello(context.Background(), &api.HelloRequest{
        Greeting: "Hello Server 2 !!",
    })
    if err != nil {
        log.Fatalf("%v", err)
    }
    log.Printf("SayHello Resp2:%+v", resp2)
 
    // 服務器端流式 RPC;
    recvListHello, err := client.ListHello(context.Background(), &api.HelloRequest{Greeting: "Hello Server List Hello"})
    if err != nil {
        log.Fatalf("ListHello err: %v", err)
    }
 
    for {
        //Recv() 方法接收服務端消息,默認每次Recv()最大消息長度爲`1024*1024*4`bytes(4M)
        resp, err := recvListHello.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
 
        log.Printf("ListHello Server Resp: %v", resp.Reply)
    }
    //能夠使用CloseSend()關閉stream,這樣服務端就不會繼續產生流消息
    //調用CloseSend()後,若繼續調用Recv(),會從新激活stream,接着以前結果獲取消息
 
    // 客戶端流式 RPC;
    sayMoreClient, err := client.SayMoreHello(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    for i := 0; i < 3; i++ {
        sayMoreClient.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayMoreHello Hello Server %d", i)})
    }
    //關閉流並獲取返回的消息
    sayMoreResp, err := sayMoreClient.CloseAndRecv()
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("SayMoreHello Server Resp: %v", sayMoreResp.Reply)
 
    // 雙向流式 RPC;
    sayHelloChat, err := client.SayHelloChat(context.Background())
    if err != nil {
        log.Fatal(err)
    }
 
    for i := 0; i < 3; i++ {
        err = sayHelloChat.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Hello Server %d", i)})
        if err != nil {
            log.Fatalf("stream request err: %v", err)
        }
        res, err := sayHelloChat.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("SayHelloChat get stream err: %v", err)
        }
        // 打印返回值
        log.Printf("SayHelloChat Server Say: %v", res.Greeting)
 
    }
 
}

8.運行結果以下:

D:\GoProject\src\gogrpcstream\server>go run main.go
2021/01/05 17:19:12 Server Start...
2021/01/05 17:20:00 Client Greeting:Hello Server 1 !!
2021/01/05 17:20:00 Client Info:map[hello:world]
2021/01/05 17:20:00 Client Greeting:Hello Server 2 !!
2021/01/05 17:20:00 Client Info:map[]
2021/01/05 17:20:00 Client Say: Hello Server List Hello
2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 0
2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 1
2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 2
2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 0
2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 1
2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 2
D:\GoProject\src\gogrpcstream\client>go run main.go
2021/01/05 17:20:00 SayHello Resp1:reply:"Hello World !!"  details:{[type.googleapis.com/api.Hello]:{msg:"Good Request"}}
2021/01/05 17:20:00 SayHello Resp2:reply:"Hello World !!"  details:{[type.googleapis.com/api.Error]:{msg:"Bad Request"  msg:"Wrong Info Msg"}}
2021/01/05 17:20:00 ListHello Server Resp: ListHello Reply Hello Server List Hello 1
2021/01/05 17:20:01 ListHello Server Resp: ListHello Reply Hello Server List Hello 2
2021/01/05 17:20:02 ListHello Server Resp: ListHello Reply Hello Server List Hello 3
2021/01/05 17:20:03 SayMoreHello Server Resp: SayMoreHello Recv Muti Greeting
2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 1
2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 2
2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 3

證書 能夠利用MySSL測試證書生成工具生成兩張證書 也能夠用openssl來實現。

下載地址 https://github.com/dz45693/gogrpcstrem.git

參考:

https://razeencheng.com/post/how-to-use-grpc-in-golang-03

相關文章
相關標籤/搜索