go-grpc-流式接口(streaming rpc)

上一篇咱們介紹了rpc最基本的應用,今天咱們來看看rpc的另一個數據交互方式streaming rpc,也就是流式接口。html

streaming rpc相比於simple rpc來講能夠很好的解決一個接口發送大量數據的場景。git

好比一個訂單導出的接口有20萬條記錄,若是使用simple rpc來實現的話。那麼咱們須要一次性接收到20萬記錄才能進行下一步的操做。可是若是咱們使用streaming rpc那麼咱們就能夠接收一條記錄處理一條記錄,直到因此的數據傳輸完畢。這樣能夠較少服務器的瞬時壓力,也更有及時性github

下面們來看看streaming rpc具體是怎麼交互的。golang

IDL

syntax = "proto3";
package proto;

message Order {
    int32 id = 1;
    string orderSn = 2;
    string date = 3;
}
message OrderList{
    Order order = 1;
}
message OrderSearchParams {
}
message Image{
    string fileName = 1;
    string file = 2;
}
message ImageList{
    Image image = 1;
}
message uploadResponse{
}
message SumData{
    int32 number = 1;
}
service StreamService {
    rpc OrderList(OrderSearchParams) returns (stream OrderList){}; //服務端流式
    rpc UploadFile(stream ImageList) returns (uploadResponse){}; //客戶端流式
    rpc SumData(stream SumData) returns (stream SumData){}; //雙向流式
}

這裏定義了三個方法服務器

  • OrderList 服務器流式,客戶端普通rpc調用
  • UploadFile 客戶端流式,服務端普通rpc
  • SumData 雙向流式

基礎結構

server
package main

import (
    "google.golang.org/grpc"
    "iris-grpc-example/proto"
    "log"
    "net"
)
type  StreamServices struct {}
func main()  {
    server := grpc.NewServer()
    proto.RegisterStreamServiceServer(server, &StreamServices{})

    lis, err := net.Listen("tcp", "127.0.0.1:9528")
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }
    server.Serve(lis)
}
func (services *StreamServices)OrderList(params *proto.OrderSearchParams,stream proto.StreamService_OrderListServer) error {
    return  nil
}

func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error {
    return  nil
}

func (services *StreamServices)SumData(stream proto.StreamService_SumDataServer) error {
    return  nil
}
clietn
package main
import (
    "github.com/kataras/iris/v12"
    "google.golang.org/grpc"
    "iris-grpc-example/proto"
    "log"
)
var streamClient proto.StreamServiceClient
func main()  {
    app := iris.New()
    app.Logger().SetLevel("debug") //debug
    app.Handle("GET", "/testOrderList", orderList)
    app.Handle("GET", "/testUploadImage", uploadImage)
    app.Handle("GET", "/testSumData", sumData)
    app.Run(iris.Addr("127.0.0.1:8080"))
}
func init() {
    connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure())
    if err != nil {
        log.Fatalln(err)
    }
    streamClient = proto.NewStreamServiceClient(connect)
}

func orderList(ctx iris.Context)  {
    
}

func uploadImage(ctx iris.Context)  {

}

func sumData(ctx iris.Context)  {

}

按照proto中的約定,先實現接口並註冊一個服務。接下來咱們依次來實現三個不一樣的流式方法。併發

服務端流式 orderList
server
func (services *StreamServices) OrderList(params *proto.OrderSearchParams, stream proto.StreamService_OrderListServer) error {
    for i := 0; i <= 10; i++ {
        order := proto.Order{
            Id:      int32(i),
            OrderSn: time.Now().Format("20060102150405") + "order_sn",
            Date:    time.Now().Format("2006-01-02 15:04:05"),
        }
        err := stream.Send(&proto.StreamOrderList{
            Order: &order,
        })
        if err != nil {
            return err
        }
    }
    return nil
}

gRPC爲咱們提供一個流的發送方法。send,這樣咱們能夠很簡單的以流的方式傳遞數據。
如今咱們來查看streaming.pb.go中的sendapp

func (x *streamServiceOrderListServer) Send(m *StreamOrderList) error {
    return x.ServerStream.SendMsg(m)
}

能夠看到最終是使用ServerStream.SendMsg,查看源碼,能夠發現,最終是使用了一個結構體。tcp

type serverStream struct {
    ctx   context.Context
    ......

    maxReceiveMessageSize int
    maxSendMessageSize    int
    ......
}

這裏咱們關心兩個值,最大可接收大小,最大發送大小。而再SendMsg中也有對於的大小判斷,因此發送的消息大小不是無限制的。學習

// TODO(dfawley): should we be checking len(data) instead?
    if len(payload) > ss.maxSendMessageSize {
        return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
    }

咱們能夠服務端建立server的時候經過server := grpc.NewServer(grpc.MaxSendMsgSize())來指定大小,有能夠在客服端建立client的時候經過connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure(),grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize()))的時候來指定,具體你們能夠下來本身詳細瞭解下配置項。google

client
func orderList(ctx iris.Context) {
    stream, err := streamClient.OrderList(context.Background(), &proto.OrderSearchParams{})
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            ctx.JSON(map[string]string{
                "err": err.Error(),
            })

            return
        }
        ctx.JSON(res)
        log.Println(res)
    }
}

這裏在for循環中去讀取數據,直到取到一個io.EOF的結束錯誤佔位符。

客戶端流式 uploadImage
server
func (services *StreamServices) UploadFile(stream proto.StreamService_UploadFileServer) error {
    for  {
         res,err := stream.Recv()
         //接收消息結束,發送結果,並關閉
        if err == io.EOF {
            return stream.SendAndClose(&proto.UploadResponse{})
        }
        if err !=nil {
            return err
        }
        fmt.Println(res)
    }
    return nil
}

能夠看到這裏咱們一樣使用for結合stream.Recv()來接收數據流,可是這裏咱們多一個SendAndClose,表示服務器已經接收消息結束,併發生一個正確的響應給客戶端。

client
func uploadImage(ctx iris.Context) {
    stream,err := streamClient.UploadFile(context.Background())
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for i:=1;i<=10 ; i++ {
        img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"}
        images := &proto.StreamImageList{Image:img}
        err := stream.Send(images)
        if err != nil {
            ctx.JSON(map[string]string{
                "err": err.Error(),
            })
            return
        }
    }
    //發送完畢 關閉並獲取服務端返回的消息
    resp, err := stream.CloseAndRecv()
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    ctx.JSON(map[string]interface{}{"result": resp,"message":"success"})
    log.Println(resp)
}

而在客戶端發送數據完畢的時候須要使用CloseAndRecv 須要接收服務端接收完畢的通知以及關閉當前通道。

雙向流式 uploadImage
server
func (services *StreamServices) SumData(stream proto.StreamService_SumDataServer) error {
    i := 0
    for {
        err := stream.Send(&proto.StreamSumData{Number: int32(i)})
        if err != nil {
            return err
        }
        res, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        log.Printf("res:%d,i:%d,sum:%d\r\n", res.Number, i, int32(i)+res.Number)
        i++
    }
}

服務端在發送消息的同時,並接收服務端發送的消息。

client
func sumData(ctx iris.Context) {
    stream, err := streamClient.SumData(context.Background())
    if err != nil {
        ctx.JSON(map[string]string{
            "err": err.Error(),
        })
        return
    }
    for i := 1; i <= 10; i++ {
        err = stream.Send(&proto.StreamSumData{Number: int32(i)})
        if err == io.EOF {
            break
        }
        if err != nil {
            return
        }
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return
        }
        log.Printf("res number:%d", res.Number)
    }
    stream.CloseSend()
    return
}

上面咱們能夠看到。客戶端有一個執行斷開鏈接的標識CloseSend(),而服務器沒有,由於服務端斷開鏈接是隱式的,咱們只須要退出循環便可斷開鏈接。能夠靈活的控制。

上面就是go-gRPC的流式接口。只是一個簡單的例子。若有不妥的地方歡迎指出。感謝。

學習於-煎魚

期待一塊兒交流

qrcode_for_gh_60813539dc23_258.jpg

相關文章
相關標籤/搜索