上一篇介紹了簡單模式RPC
,當數據量大或者須要不斷傳輸數據時候,咱們應該使用流式RPC,它容許咱們邊處理邊傳輸數據。本篇先介紹服務端流式RPC
。git
服務端流式RPC
:客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,直到裏面沒有任何消息。github
1.客戶端要獲取某原油股的實時走勢,客戶端發送一個請求shell
2.服務端實時返回該股票的走勢服務器
新建server_stream.proto文件網絡
1.定義發送信息tcp
// 定義發送請求信息 message SimpleRequest{ // 定義發送的參數,採用駝峯命名方式,小寫加下劃線,如:student_name // 請求參數 string data = 1; }
2.定義接收信息.net
// 定義流式響應信息 message StreamResponse{ // 流式響應數據 string stream_value = 1; }
3.定義服務方式ListValuecode
服務端流式rpc,只要在響應數據前添加stream便可server
// 定義咱們的服務(可定義多個服務,每一個服務可定義多個接口) service StreamServer{ // 服務端流式rpc,在響應數據前添加stream rpc ListValue(SimpleRequest)returns(stream StreamResponse){}; }
4.編譯proto文件blog
進入server_stream.proto所在目錄,運行指令:
protoc --go_out=plugins=grpc:./ ./simple.proto
1.定義咱們的服務,並實現ListValue方法
// SimpleService 定義咱們的服務 type StreamService struct{} // ListValue 實現ListValue方法 func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error { for n := 0; n < 5; n++ { // 向流中發送消息, 默認每次send送消息最大長度爲`math.MaxInt32`bytes err := srv.Send(&pb.StreamResponse{ StreamValue: req.Data + strconv.Itoa(n), }) if err != nil { return err } } return nil }
初學者可能以爲比較迷惑,ListValue的參數和返回值是怎樣肯定的。其實這些都是編譯proto時生成的.pb.go文件中有定義,咱們只須要實現就能夠了。
2.啓動gRPC服務器
const ( // Address 監聽地址 Address string = ":8000" // Network 網絡通訊協議 Network string = "tcp" ) func main() { // 監聽本地端口 listener, err := net.Listen(Network, Address) if err != nil { log.Fatalf("net.Listen err: %v", err) } log.Println(Address + " net.Listing...") // 新建gRPC服務器實例 // 默認單次接收最大消息長度爲`1024*1024*4`bytes(4M),單次發送消息最大長度爲`math.MaxInt32`bytes // grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024*4), grpc.MaxSendMsgSize(math.MaxInt32)) grpcServer := grpc.NewServer() // 在gRPC服務器註冊咱們的服務 pb.RegisterStreamServerServer(grpcServer, &StreamService{}) //用服務器 Serve() 方法以及咱們的端口信息區實現阻塞等待,直到進程被殺死或者 Stop() 被調用 err = grpcServer.Serve(listener) if err != nil { log.Fatalf("grpcServer.Serve err: %v", err) } }
運行服務端
go run server.go :8000 net.Listing...
1.建立調用服務端ListValue方法
// listValue 調用服務端的ListValue方法 func listValue() { // 建立發送結構體 req := pb.SimpleRequest{ Data: "stream server grpc ", } // 調用咱們的服務(ListValue方法) stream, err := grpcClient.ListValue(context.Background(), &req) if err != nil { log.Fatalf("Call ListStr err: %v", err) } for { //Recv() 方法接收服務端消息,默認每次Recv()最大消息長度爲`1024*1024*4`bytes(4M) res, err := stream.Recv() // 判斷消息流是否已經結束 if err == io.EOF { break } if err != nil { log.Fatalf("ListStr get stream err: %v", err) } // 打印返回值 log.Println(res.StreamValue) } }
2.啓動gRPC客戶端
// Address 鏈接地址 const Address string = ":8000" var grpcClient pb.StreamServerClient func main() { // 鏈接服務器 conn, err := grpc.Dial(Address, grpc.WithInsecure()) if err != nil { log.Fatalf("net.Connect err: %v", err) } defer conn.Close() // 創建gRPC鏈接 grpcClient = pb.NewStreamServerClient(conn) route() listValue() }
運行客戶端
go run client.go stream server grpc 0 stream server grpc 1 stream server grpc 2 stream server grpc 3 stream server grpc 4
客戶端不斷從服務端獲取數據
假如服務端不停發送數據,相似獲取股票走勢實時數據,客戶端能本身中止獲取數據嗎?
答案:能夠的
1.咱們把服務端的ListValue方法稍微修改
// ListValue 實現ListValue方法 func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error { for n := 0; n < 15; n++ { // 向流中發送消息, 默認每次send送消息最大長度爲`math.MaxInt32`bytes err := srv.Send(&pb.StreamResponse{ StreamValue: req.Data + strconv.Itoa(n), }) if err != nil { return err } log.Println(n) time.Sleep(1 * time.Second) } return nil }
2.再把客戶端調用ListValue方法的實現稍做修改,就能夠獲得結果了
// listValue 調用服務端的ListValue方法 func listValue() { // 建立發送結構體 req := pb.SimpleRequest{ Data: "stream server grpc ", } // 調用咱們的服務(Route方法) // 同時傳入了一個 context.Context ,在有須要時可讓咱們改變RPC的行爲,好比超時/取消一個正在運行的RPC stream, err := grpcClient.ListValue(context.Background(), &req) if err != nil { log.Fatalf("Call ListStr err: %v", err) } for { //Recv() 方法接收服務端消息,默認每次Recv()最大消息長度爲`1024*1024*4`bytes(4M) res, err := stream.Recv() // 判斷消息流是否已經結束 if err == io.EOF { break } if err != nil { log.Fatalf("ListStr get stream err: %v", err) } // 打印返回值 log.Println(res.StreamValue) break } //可使用CloseSend()關閉stream,這樣服務端就不會繼續產生流消息 //調用CloseSend()後,若繼續調用Recv(),會從新激活stream,接着以前結果獲取消息 stream.CloseSend() }
只須要調用CloseSend()
方法,就能夠關閉服務端的stream,讓它中止發送數據。值得注意的是,調用CloseSend()
後,若繼續調用Recv()
,會從新激活stream,接着當前的結果繼續獲取消息。
這能完美解決客戶端暫停
->繼續
獲取數據的操做。
本篇介紹了服務端流式RPC
的簡單實用,客戶端發起一個請求,服務端不停返回數據,直到服務端中止發送數據或客戶端主動中止接收數據爲止。下篇將介紹客戶端流式RPC
。
教程源碼地址:https://github.com/Bingjian-Zhu/go-grpc-example
參考:gRPC官方文檔中文版