原文地址:帶入gRPC:gRPC Streaming, Client and Servergit
本章節將介紹 gRPC 的流式,分爲三種類型:github
任何技術,由於有痛點,因此纔有了存在的必要性。若是您想要了解 gRPC 的流式調用,請繼續golang
gRPC Streaming 是基於 HTTP/2 的,後續章節再進行詳細講解segmentfault
流式爲何要存在呢,是 Simple RPC 有什麼問題嗎?經過模擬業務場景,可得知在使用 Simple RPC 時,有以下問題:服務器
天天早上 6 點,都有一批百萬級別的數據集要同從 A 同步到 B,在同步的時候,會作一系列操做(歸檔、數據分析、畫像、日誌等)。這一次性涉及的數據量確實大tcp
在同步完成後,也有人立刻會去查閱數據,爲了新的一天籌備。也符合實時性。ide
二者相較下,這個場景下更適合使用 Streaming RPCgoogle
在講解具體的 gRPC 流式代碼時,會着重在第一節講解,由於三種模式實際上是不一樣的組合。但願你可以注重理解,觸類旁通,其實都是同樣的知識點 👍spa
$ tree go-grpc-example go-grpc-example ├── client │ ├── simple_client │ │ └── client.go │ └── stream_client │ └── client.go ├── proto │ ├── search.proto │ └── stream.proto └── server ├── simple_server │ └── server.go └── stream_server └── server.go
增長 stream_server、stream_client 存放服務端和客戶端文件,proto/stream.proto 用於編寫 IDL日誌
在 proto 文件夾下的 stream.proto 文件中,寫入以下內容:
syntax = "proto3"; package proto; service StreamService { rpc List(StreamRequest) returns (stream StreamResponse) {}; rpc Record(stream StreamRequest) returns (StreamResponse) {}; rpc Route(stream StreamRequest) returns (stream StreamResponse) {}; } message StreamPoint { string name = 1; int32 value = 2; } message StreamRequest { StreamPoint pt = 1; } message StreamResponse { StreamPoint pt = 1; }
注意關鍵字 stream,聲明其爲一個流方法。這裏共涉及三個方法,對應關係爲
package main import ( "log" "net" "google.golang.org/grpc" pb "github.com/EDDYCJY/go-grpc-example/proto" ) type StreamService struct{} const ( PORT = "9002" ) func main() { server := grpc.NewServer() pb.RegisterStreamServiceServer(server, &StreamService{}) lis, err := net.Listen("tcp", ":"+PORT) if err != nil { log.Fatalf("net.Listen err: %v", err) } server.Serve(lis) } func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error { return nil } func (s *StreamService) Record(stream pb.StreamService_RecordServer) error { return nil } func (s *StreamService) Route(stream pb.StreamService_RouteServer) error { return nil }
寫代碼前,建議先將 gRPC Server 的基礎模板和接口給空定義出來。如有不清楚可參見上一章節的知識點
package main import ( "log" "google.golang.org/grpc" pb "github.com/EDDYCJY/go-grpc-example/proto" ) const ( PORT = "9002" ) func main() { conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure()) if err != nil { log.Fatalf("grpc.Dial err: %v", err) } defer conn.Close() client := pb.NewStreamServiceClient(conn) err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}}) if err != nil { log.Fatalf("printLists.err: %v", err) } err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}}) if err != nil { log.Fatalf("printRecord.err: %v", err) } err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}}) if err != nil { log.Fatalf("printRoute.err: %v", err) } } func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error { return nil } func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error { return nil } func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error { return nil }
服務器端流式 RPC,顯然是單向流,並代指 Server 爲 Stream 而 Client 爲普通 RPC 請求
簡單來說就是客戶端發起一次普通的 RPC 請求,服務端經過流式響應屢次發送數據集,客戶端 Recv 接收數據集。大體如圖:
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error { for n := 0; n <= 6; n++ { err := stream.Send(&pb.StreamResponse{ Pt: &pb.StreamPoint{ Name: r.Pt.Name, Value: r.Pt.Value + int32(n), }, }) if err != nil { return err } } return nil }
在 Server,主要留意 stream.Send
方法。它看上去能發送 N 次?有沒有大小限制?
type StreamService_ListServer interface { Send(*StreamResponse) error grpc.ServerStream } func (x *streamServiceListServer) Send(m *StreamResponse) error { return x.ServerStream.SendMsg(m) }
經過閱讀源碼,可得知是 protoc 在生成時,根據定義生成了各式各樣符合標準的接口方法。最終再統一調度內部的 SendMsg
方法,該方法涉及如下過程:
math.MaxInt32
),若超出則提示錯誤func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.List(context.Background(), r) if err != nil { return err } for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value) } return nil }
在 Client,主要留意 stream.Recv()
方法。什麼狀況下 io.EOF
?什麼狀況下存在錯誤信息呢?
type StreamService_ListClient interface { Recv() (*StreamResponse, error) grpc.ClientStream } func (x *streamServiceListClient) Recv() (*StreamResponse, error) { m := new(StreamResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil }
經過閱讀源碼,可得知:當流結束(調用了 Close)時,會出現 io.EOF
。而錯誤信息(err)基本都由另外一側反饋過來,所以進行平常處理和標記便可
運行 stream_server/server.go:
$ go run server.go
運行 stream_client/client.go:
$ go run client.go 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024
客戶端流式 RPC,單向流,客戶端經過流式發起屢次 RPC 請求給服務端,服務端發起一次響應給客戶端,大體如圖:
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error { for { r, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}}) } if err != nil { return err } log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value) } return nil }
多了一個從未見過的方法 stream.SendAndClose
,它是作什麼用的呢?
在這段程序中,咱們對每個 Recv 都進行了處理,當發現 io.EOF
(流關閉) 後,須要將最終的響應結果發送給客戶端,同時關閉正在另一側等待的 Recv
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.Record(context.Background()) if err != nil { return err } for n := 0; n < 6; n++ { err := stream.Send(r) if err != nil { return err } } resp, err := stream.CloseAndRecv() if err != nil { return err } log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value) return nil }
stream.CloseAndRecv
和 stream.SendAndClose
是配套使用的流方法,相信聰明的你已經秒懂它的做用了
重啓 stream_server/server.go,再次運行 stream_client/client.go:
$ go run client.go 2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
$ go run server.go 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
雙向流式 RPC,顧名思義是雙向流。由客戶端以流式的方式發起請求,服務端一樣以流式的方式響應請求
首個請求必定是 Client 發起,但具體交互方式(誰先誰後、一次發多少、響應多少、何時關閉)根據程序編寫的方式來肯定(能夠結合協程)
所以圖示也變幻無窮,這裏就不放出來了
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error { n := 0 for { err := stream.Send(&pb.StreamResponse{ Pt: &pb.StreamPoint{ Name: "gPRC Stream Client: Route", Value: int32(n), }, }) if err != nil { return err } r, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } n++ log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value) } return nil }
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error { stream, err := client.Route(context.Background()) if err != nil { return err } for n := 0; n <= 6; n++ { err = stream.Send(r) if err != nil { return err } resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value) } stream.CloseSend() return nil }
重啓 stream_server/server.go,再次運行 stream_client/client.go:
$ go run server.go 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
$ go run client.go 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6
在本文共介紹了三類流的交互方式,能夠根據實際的業務場景去選擇合適的方式。會事半功倍哦 🎑