gRPC主要有4種請求和響應模式,分別是簡單模式(Simple RPC)
、服務端流式(Server-side streaming RPC)
、客戶端流式(Client-side streaming RPC)
、和雙向流式(Bidirectional streaming RPC)
。html
1.簡單模式(Simple RPC)
:客戶端發起請求並等待服務端響應。git
2.服務端流式(Server-side streaming RPC)
:客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,直到裏面沒有任何消息。 場景:.客戶端要獲取某原油股的實時走勢,客戶端發送一個請求, 服務端實時返回該股票的走勢github
3.客戶端流式(Client-side streaming RPC)
:與服務端數據流模式相反,此次是客戶端源源不斷的向服務端發送數據流,而在發送結束後,由服務端返回一個響應。情景模擬:客戶端大量數據上傳到服務端golang
4.雙向流式(Bidirectional streaming RPC)
:雙方使用讀寫流去發送一個消息序列,兩個流獨立操做,雙方能夠同時發送和同時接收。 情景模擬:雙方對話(能夠一問一答、一問多答、多問一答,形式靈活)api
從上面的定義不難看出,用stream
能夠定義一個流式消息。下面咱們就經過實例來演示一下流式通訊的使用方法。服務器
1.首先/api/hello.proto
[以下], 而且生成新的api/hello.pb.go
代碼。app
syntax = "proto3"; package api; // Any消息類型容許您將消息做爲嵌入類型,而不須要它們 .proto定義。Any包含任意序列化的消息(字節),以及一個URL,該URL充當該消息的全局惟一標識符並解析爲該消息的類型。要使用Any類型,你須要導入google/protobuf/any.proto. import "google/protobuf/any.proto"; message HelloRequest { string greeting = 1; map<string, string> infos = 2; } message HelloResponse { string reply = 1; repeated google.protobuf.Any details = 2; } service HelloService { rpc SayHello(HelloRequest) returns (HelloResponse){} rpc ListHello(HelloRequest) returns (stream HelloResponse) {} rpc SayMoreHello(stream HelloRequest) returns (HelloResponse) {} rpc SayHelloChat(stream HelloRequest) returns (stream HelloRequest) {} } message Hello { string msg = 1; } message Error { repeated string msg = 1; }
2.編譯指令:tcp
protoc -ID:\Go\include -I. --go_out=plugins=grpc:. ./api/api.proto
3.在生成的代碼api.hello.go中,咱們能夠看到客戶端接口以下:ide
type HelloServiceServer interface { SayHello(context.Context, *HelloRequest) (*HelloResponse, error) ListHello(*HelloRequest, HelloService_ListHelloServer) error SayMoreHello(HelloService_SayMoreHelloServer) error SayHelloChat(HelloService_SayHelloChatServer) error }
4.接口的實現 server/service/service.go以下:工具
package service import ( "context" "fmt" "io" "log" "time" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" api "gogrpcstream/api" ) type SayHelloServer struct{} func (s *SayHelloServer) SayHello(ctx context.Context, in *api.HelloRequest) (res *api.HelloResponse, err error) { log.Printf("Client Greeting:%s", in.Greeting) log.Printf("Client Info:%v", in.Infos) var an *any.Any if in.Infos["hello"] == "world" { an, err = ptypes.MarshalAny(&api.Hello{Msg: "Good Request"}) } else { an, err = ptypes.MarshalAny(&api.Error{Msg: []string{"Bad Request", "Wrong Info Msg"}}) } if err != nil { return } return &api.HelloResponse{ Reply: "Hello World !!", Details: []*any.Any{an}, }, nil } // 服務器端流式 RPC, 接收一次客戶端請求,返回一個流 func (s *SayHelloServer) ListHello(in *api.HelloRequest, stream api.HelloService_ListHelloServer) error { log.Printf("Client Say: %v", in.Greeting) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 1"}) time.Sleep(1 * time.Second) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 2"}) time.Sleep(1 * time.Second) stream.Send(&api.HelloResponse{Reply: "ListHello Reply " + in.Greeting + " 3"}) time.Sleep(1 * time.Second) return nil } // 客戶端流式 RPC, 客戶端流式請求,服務器可返回一次 func (s *SayHelloServer) SayMoreHello(stream api.HelloService_SayMoreHelloServer) error { // 接受客戶端請求 for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("SayMoreHello Client Say: %v", req.Greeting) } // 流讀取完成後,返回 return stream.SendAndClose(&api.HelloResponse{Reply: "SayMoreHello Recv Muti Greeting"}) } //雙向 func (s *SayHelloServer) SayHelloChat(stream api.HelloService_SayHelloChatServer) error { n := 1 for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } err = stream.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Server Say Hello %d", n)}) if err != nil { return err } n++ log.Printf("SayHelloChat Client Say: %v", req.Greeting) } return nil }
5. server/main.go 服務端實現:
package main import ( "crypto/tls" "crypto/x509" "io/ioutil" "log" "net" "google.golang.org/grpc/credentials" "google.golang.org/grpc" api "gogrpcstream/api" sv "gogrpcstream/server/service" ) func main() { lis, err := net.Listen("tcp", ":8080") if err != nil { panic(err) } // 加載證書和密鑰 (同時能驗證證書與私鑰是否匹配) cert, err := tls.LoadX509KeyPair("../certs/server.pem", "../certs/server.key") if err != nil { panic(err) } // 將根證書加入證書詞 // 測試證書的根若是不加入可信池,那麼測試證書將視爲不惋惜,沒法經過驗證。 certPool := x509.NewCertPool() rootBuf, err := ioutil.ReadFile("../certs/ca.pem") if err != nil { panic(err) } if !certPool.AppendCertsFromPEM(rootBuf) { panic("fail to append test ca") } tlsConf := &tls.Config{ ClientAuth: tls.RequireAndVerifyClientCert, Certificates: []tls.Certificate{cert}, ClientCAs: certPool, } serverOpt := grpc.Creds(credentials.NewTLS(tlsConf)) grpcServer := grpc.NewServer(serverOpt) api.RegisterHelloServiceServer(grpcServer, &sv.SayHelloServer{}) log.Println("Server Start...") grpcServer.Serve(lis) }
6.客服端實現:client/main.go
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "io" "io/ioutil" "log" api "gogrpcstream/api" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) func main() { cert, err := tls.LoadX509KeyPair("../certs/client.pem", "../certs/client.key") if err != nil { panic(err) } // 將根證書加入證書池 certPool := x509.NewCertPool() bs, err := ioutil.ReadFile("../certs/ca.pem") if err != nil { panic(err) } if !certPool.AppendCertsFromPEM(bs) { panic("cc") } // 新建憑證 transportCreds := credentials.NewTLS(&tls.Config{ ServerName: "localhost", Certificates: []tls.Certificate{cert}, RootCAs: certPool, }) dialOpt := grpc.WithTransportCredentials(transportCreds) conn, err := grpc.Dial("localhost:8080", dialOpt) if err != nil { log.Fatalf("Dial failed:%v", err) } defer conn.Close() client := api.NewHelloServiceClient(conn) resp1, err := client.SayHello(context.Background(), &api.HelloRequest{ Greeting: "Hello Server 1 !!", Infos: map[string]string{"hello": "world"}, }) if err != nil { log.Fatal(err) } log.Printf("SayHello Resp1:%+v", resp1) resp2, err := client.SayHello(context.Background(), &api.HelloRequest{ Greeting: "Hello Server 2 !!", }) if err != nil { log.Fatalf("%v", err) } log.Printf("SayHello Resp2:%+v", resp2) // 服務器端流式 RPC; recvListHello, err := client.ListHello(context.Background(), &api.HelloRequest{Greeting: "Hello Server List Hello"}) if err != nil { log.Fatalf("ListHello err: %v", err) } for { //Recv() 方法接收服務端消息,默認每次Recv()最大消息長度爲`1024*1024*4`bytes(4M) resp, err := recvListHello.Recv() if err == io.EOF { break } if err != nil { log.Fatal(err) } log.Printf("ListHello Server Resp: %v", resp.Reply) } //能夠使用CloseSend()關閉stream,這樣服務端就不會繼續產生流消息 //調用CloseSend()後,若繼續調用Recv(),會從新激活stream,接着以前結果獲取消息 // 客戶端流式 RPC; sayMoreClient, err := client.SayMoreHello(context.Background()) if err != nil { log.Fatal(err) } for i := 0; i < 3; i++ { sayMoreClient.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayMoreHello Hello Server %d", i)}) } //關閉流並獲取返回的消息 sayMoreResp, err := sayMoreClient.CloseAndRecv() if err != nil { log.Fatal(err) } log.Printf("SayMoreHello Server Resp: %v", sayMoreResp.Reply) // 雙向流式 RPC; sayHelloChat, err := client.SayHelloChat(context.Background()) if err != nil { log.Fatal(err) } for i := 0; i < 3; i++ { err = sayHelloChat.Send(&api.HelloRequest{Greeting: fmt.Sprintf("SayHelloChat Hello Server %d", i)}) if err != nil { log.Fatalf("stream request err: %v", err) } res, err := sayHelloChat.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("SayHelloChat get stream err: %v", err) } // 打印返回值 log.Printf("SayHelloChat Server Say: %v", res.Greeting) } }
8.運行結果以下:
D:\GoProject\src\gogrpcstream\server>go run main.go 2021/01/05 17:19:12 Server Start... 2021/01/05 17:20:00 Client Greeting:Hello Server 1 !! 2021/01/05 17:20:00 Client Info:map[hello:world] 2021/01/05 17:20:00 Client Greeting:Hello Server 2 !! 2021/01/05 17:20:00 Client Info:map[] 2021/01/05 17:20:00 Client Say: Hello Server List Hello 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 0 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 1 2021/01/05 17:20:03 SayMoreHello Client Say: SayMoreHello Hello Server 2 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 0 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 1 2021/01/05 17:20:03 SayHelloChat Client Say: SayHelloChat Hello Server 2
D:\GoProject\src\gogrpcstream\client>go run main.go 2021/01/05 17:20:00 SayHello Resp1:reply:"Hello World !!" details:{[type.googleapis.com/api.Hello]:{msg:"Good Request"}} 2021/01/05 17:20:00 SayHello Resp2:reply:"Hello World !!" details:{[type.googleapis.com/api.Error]:{msg:"Bad Request" msg:"Wrong Info Msg"}} 2021/01/05 17:20:00 ListHello Server Resp: ListHello Reply Hello Server List Hello 1 2021/01/05 17:20:01 ListHello Server Resp: ListHello Reply Hello Server List Hello 2 2021/01/05 17:20:02 ListHello Server Resp: ListHello Reply Hello Server List Hello 3 2021/01/05 17:20:03 SayMoreHello Server Resp: SayMoreHello Recv Muti Greeting 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 1 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 2 2021/01/05 17:20:03 SayHelloChat Server Say: SayHelloChat Server Say Hello 3
證書 能夠利用MySSL測試證書生成工具生成兩張證書 也能夠用openssl來實現。
下載地址 https://github.com/dz45693/gogrpcstrem.git
參考: