上一篇咱們介紹了rpc
最基本的應用,今天咱們來看看rpc
的另一個數據交互方式streaming rpc
,也就是流式接口。html
streaming rpc
相比於simple rpc
來講能夠很好的解決一個接口發送大量數據的場景。git
好比一個訂單導出的接口有20萬條記錄,若是使用simple rpc
來實現的話。那麼咱們須要一次性接收到20萬記錄才能進行下一步的操做。可是若是咱們使用streaming rpc
那麼咱們就能夠接收一條記錄處理一條記錄,直到因此的數據傳輸完畢。這樣能夠較少服務器的瞬時壓力,也更有及時性github
下面們來看看streaming rpc
具體是怎麼交互的。golang
syntax = "proto3"; package proto; message Order { int32 id = 1; string orderSn = 2; string date = 3; } message OrderList{ Order order = 1; } message OrderSearchParams { } message Image{ string fileName = 1; string file = 2; } message ImageList{ Image image = 1; } message uploadResponse{ } message SumData{ int32 number = 1; } service StreamService { rpc OrderList(OrderSearchParams) returns (stream OrderList){}; //服務端流式 rpc UploadFile(stream ImageList) returns (uploadResponse){}; //客戶端流式 rpc SumData(stream SumData) returns (stream SumData){}; //雙向流式 }
這裏定義了三個方法服務器
package main import ( "google.golang.org/grpc" "iris-grpc-example/proto" "log" "net" ) type StreamServices struct {} func main() { server := grpc.NewServer() proto.RegisterStreamServiceServer(server, &StreamServices{}) lis, err := net.Listen("tcp", "127.0.0.1:9528") if err != nil { log.Fatalf("net.Listen err: %v", err) } server.Serve(lis) } func (services *StreamServices)OrderList(params *proto.OrderSearchParams,stream proto.StreamService_OrderListServer) error { return nil } func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error { return nil } func (services *StreamServices)SumData(stream proto.StreamService_SumDataServer) error { return nil }
package main import ( "github.com/kataras/iris/v12" "google.golang.org/grpc" "iris-grpc-example/proto" "log" ) var streamClient proto.StreamServiceClient func main() { app := iris.New() app.Logger().SetLevel("debug") //debug app.Handle("GET", "/testOrderList", orderList) app.Handle("GET", "/testUploadImage", uploadImage) app.Handle("GET", "/testSumData", sumData) app.Run(iris.Addr("127.0.0.1:8080")) } func init() { connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure()) if err != nil { log.Fatalln(err) } streamClient = proto.NewStreamServiceClient(connect) } func orderList(ctx iris.Context) { } func uploadImage(ctx iris.Context) { } func sumData(ctx iris.Context) { }
按照proto
中的約定,先實現接口並註冊一個服務。接下來咱們依次來實現三個不一樣的流式方法。併發
func (services *StreamServices) OrderList(params *proto.OrderSearchParams, stream proto.StreamService_OrderListServer) error { for i := 0; i <= 10; i++ { order := proto.Order{ Id: int32(i), OrderSn: time.Now().Format("20060102150405") + "order_sn", Date: time.Now().Format("2006-01-02 15:04:05"), } err := stream.Send(&proto.StreamOrderList{ Order: &order, }) if err != nil { return err } } return nil }
gRPC爲咱們提供一個流的發送方法。send
,這樣咱們能夠很簡單的以流的方式傳遞數據。
如今咱們來查看streaming.pb.go
中的send
app
func (x *streamServiceOrderListServer) Send(m *StreamOrderList) error { return x.ServerStream.SendMsg(m) }
能夠看到最終是使用ServerStream.SendMsg
,查看源碼,能夠發現,最終是使用了一個結構體。tcp
type serverStream struct { ctx context.Context ...... maxReceiveMessageSize int maxSendMessageSize int ...... }
這裏咱們關心兩個值,最大可接收大小,最大發送大小。而再SendMsg
中也有對於的大小判斷,因此發送的消息大小不是無限制的。學習
// TODO(dfawley): should we be checking len(data) instead? if len(payload) > ss.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) }
咱們能夠服務端建立server
的時候經過server := grpc.NewServer(grpc.MaxSendMsgSize())
來指定大小,有能夠在客服端建立client
的時候經過connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure(),grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize()))
的時候來指定,具體你們能夠下來本身詳細瞭解下配置項。google
func orderList(ctx iris.Context) { stream, err := streamClient.OrderList(context.Background(), &proto.OrderSearchParams{}) if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } for { res, err := stream.Recv() if err == io.EOF { break } if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } ctx.JSON(res) log.Println(res) } }
這裏在for
循環中去讀取數據,直到取到一個io.EOF
的結束錯誤佔位符。
func (services *StreamServices) UploadFile(stream proto.StreamService_UploadFileServer) error { for { res,err := stream.Recv() //接收消息結束,發送結果,並關閉 if err == io.EOF { return stream.SendAndClose(&proto.UploadResponse{}) } if err !=nil { return err } fmt.Println(res) } return nil }
能夠看到這裏咱們一樣使用for
結合stream.Recv()
來接收數據流,可是這裏咱們多一個SendAndClose
,表示服務器已經接收消息結束,併發生一個正確的響應給客戶端。
func uploadImage(ctx iris.Context) { stream,err := streamClient.UploadFile(context.Background()) if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } for i:=1;i<=10 ; i++ { img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"} images := &proto.StreamImageList{Image:img} err := stream.Send(images) if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } } //發送完畢 關閉並獲取服務端返回的消息 resp, err := stream.CloseAndRecv() if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } ctx.JSON(map[string]interface{}{"result": resp,"message":"success"}) log.Println(resp) }
而在客戶端發送數據完畢的時候須要使用CloseAndRecv
須要接收服務端接收完畢的通知以及關閉當前通道。
func (services *StreamServices) SumData(stream proto.StreamService_SumDataServer) error { i := 0 for { err := stream.Send(&proto.StreamSumData{Number: int32(i)}) if err != nil { return err } res, err := stream.Recv() if err == io.EOF { return nil } log.Printf("res:%d,i:%d,sum:%d\r\n", res.Number, i, int32(i)+res.Number) i++ } }
服務端在發送消息的同時,並接收服務端發送的消息。
func sumData(ctx iris.Context) { stream, err := streamClient.SumData(context.Background()) if err != nil { ctx.JSON(map[string]string{ "err": err.Error(), }) return } for i := 1; i <= 10; i++ { err = stream.Send(&proto.StreamSumData{Number: int32(i)}) if err == io.EOF { break } if err != nil { return } res, err := stream.Recv() if err == io.EOF { break } if err != nil { return } log.Printf("res number:%d", res.Number) } stream.CloseSend() return }
上面咱們能夠看到。客戶端有一個執行斷開鏈接的標識CloseSend()
,而服務器沒有,由於服務端斷開鏈接是隱式的,咱們只須要退出循環便可斷開鏈接。能夠靈活的控制。
上面就是go-gRPC
的流式接口。只是一個簡單的例子。若有不妥的地方歡迎指出。感謝。