在前一文 《ASP.NET Core 3.0 使用gRPC》中有提到 gRPC 支持雙向流調用,支持實時推送消息,這也是 gRPC的一大特色,且 gRPC 在對雙向流的控制支持上也是很是強大的。html
gRPC 有四種服務類型,分別是:簡單 RPC(Unary RPC)、服務端流式 RPC (Server streaming RPC)、客戶端流式 RPC (Client streaming RPC)、雙向流式 RPC(Bi-directional streaming RPC)。它們主要有如下特色:java
服務類型 | 特色 |
---|---|
簡單 RPC | 通常的rpc調用,傳入一個請求對象,返回一個返回對象 |
服務端流式 RPC | 傳入一個請求對象,服務端能夠返回多個結果對象 |
客戶端流式 RPC | 客戶端傳入多個請求對象,服務端返回一個結果對象 |
雙向流式 RPC | 結合客戶端流式RPC和服務端流式RPC,能夠傳入多個請求對象,返回多個結果對象 |
gRPC 通訊是基於 HTTP/2 實現的,它的雙向流映射到 HTTP/2 流。HTTP/2 具備流的概念,流是爲了實現HTTP/2的多路複用。流是服務器和客戶端在HTTP/2鏈接內用於交換幀數據的獨立雙向序列,邏輯上可看作一個較爲完整的交互處理單元,即表達一次完整的資源請求、響應數據交換流程;一個業務處理單元,在一個流內進行處理完畢,這個流生命週期完結。git
特色以下:github
摘自 HTTP/2筆記之流和多路複用 by 聶永服務器
咱們在前文中編寫的RPC屬於簡單RPC,沒有包含流調用,下面直接講一下雙向流,根據第二小節列舉的四種服務類型,若是咱們掌握了簡單RPC和雙向流RPC,那麼服務端流式 RPC和客戶端流式 RPC天然也就沒有問題了。dom
這裏咱們繼續使用前文的代碼,要實現的目標是一次給多個貓洗澡。async
① 首先在 LuCat.proto
定義兩個rpc,一個 Count 用於統計貓的數量,一個 雙向流 RPC BathTheCat 用於給貓洗澡ide
syntax = "proto3"; option csharp_namespace = "AspNetCoregRpcService"; import "google/protobuf/empty.proto"; package LuCat; //定義包名 //定義服務 service LuCat{ //定義給貓洗澡雙向流rpc rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp); //定義統計貓數量簡單rpc rpc Count(google.protobuf.Empty) returns (CountCatResult); } message SuckingCatResult{ string message=1; } message BathTheCatReq{ int32 id=1; } message BathTheCatResp{ string message=1; } message CountCatResult{ int32 Count=1; }
② 添加服務的實現google
這裏安利下Resharper,很是方便spa
private readonly ILogger<LuCatService> _logger; private static readonly List<string> Cats=new List<string>(){"英短銀漸層","英短金漸層","美短","藍貓","狸花貓","橘貓"}; private static readonly Random Rand=new Random(DateTime.Now.Millisecond); public LuCatService(ILogger<LuCatService> logger) { _logger = logger; } public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context) { var bathQueue=new Queue<int>(); while (await requestStream.MoveNext()) { //將要洗澡的貓加入隊列 bathQueue.Enqueue(requestStream.Current.Id); _logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue."); } //遍歷隊列開始洗澡 while (bathQueue.TryDequeue(out var catId)) { await responseStream.WriteAsync(new BathTheCatResp() { Message = $"鏟屎的成功給一隻{Cats[catId]}洗了澡!" }); await Task.Delay(500);//此處主要是爲了方便客戶端能看出流調用的效果 } } public override Task<CountCatResult> Count(Empty request, ServerCallContext context) { return Task.FromResult(new CountCatResult() { Count = Cats.Count }); }
BathTheCat 方法會接收多個客戶端發來的CatId,而後將他們加入隊列中,等客戶端發送完成後開始依次洗澡並返回給客戶端。
③ 客戶端實現
隨機發送10個貓Id給服務端,而後接收10個洗澡結果。
var channel = GrpcChannel.ForAddress("https://localhost:5001"); var catClient = new LuCat.LuCatClient(channel); //獲取貓總數 var catCount = await catClient.CountAsync(new Empty()); Console.WriteLine($"一共{catCount.Count}只貓。"); var rand = new Random(DateTime.Now.Millisecond); var bathCat = catClient.BathTheCat(); //定義接收吸貓響應邏輯 var bathCatRespTask = Task.Run(async() => { await foreach (var resp in bathCat.ResponseStream.ReadAllAsync()) { Console.WriteLine(resp.Message); } }); //隨機給10個貓洗澡 for (int i = 0; i < 10; i++) { await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)}); } //發送完畢 await bathCat.RequestStream.CompleteAsync(); Console.WriteLine("客戶端已發送完10個須要洗澡的貓id"); Console.WriteLine("接收洗澡結果:"); //開始接收響應 await bathCatRespTask; Console.WriteLine("洗澡完畢");
④ 運行
能夠看到雙向流調用成功,客戶端發送了10個貓洗澡請求對象,服務端返回了10個貓洗澡結果對象。且是實時推送的,這就是 gRPC 的雙向流調用。
這裏你們能夠自行改進來演變成客戶端流式或者服務端流式調用。客戶端發送一個貓Id列表,而後服務端返回每一個貓洗澡結果,這就是服務端流式調用。客戶端依次發送貓Id,而後服務端一次性返回全部貓的洗澡結果(給全部貓洗澡看作是一個業務,返回這個業務的結果),就是客戶端流式調用。這裏我就再也不演示了。
gRPC 的流式調用支持對流進行主動取消的控制,進而能夠衍生出流超時限制等控制。
在流式調用是,能夠傳一個 CancellationToken 參數,它就是咱們用來對流進行取消控制的:
改造一下咱們在第四小節的代碼:
① 客戶端
var cts = new CancellationTokenSource(); //指定在2.5s後進行取消操做 cts.CancelAfter(TimeSpan.FromSeconds(2.5)); var bathCat = catClient.BathTheCat(cancellationToken: cts.Token); //定義接收吸貓響應邏輯 var bathCatRespTask = Task.Run(async() => { try { await foreach (var resp in bathCat.ResponseStream.ReadAllAsync()) { Console.WriteLine(resp.Message); } } catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) { Console.WriteLine("Stream cancelled."); } });
② 服務端
//遍歷隊列開始洗澡 while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId)) { _logger.LogInformation($"Cat {catId} Dequeue."); await responseStream.WriteAsync(new BathTheCatResp() { Message = $"鏟屎的成功給一隻{Cats[catId]}洗了澡!" }); await Task.Delay(500);//此處主要是爲了方便客戶端能看出流調用的效果 }
③ 運行
設置的是雙向流式調用2.5s後取消流,從客戶端調用結果看到,並無收到所有10個貓的洗澡返回結果,流就已經被取消了,這就是 gRPC 的流控制。
這裏流式調用能夠實現實時推送,服務端到客戶端或者客戶端到服務端短實時推送消息,可是這個和傳統意義上的長鏈接主動推送、廣播消息不同,無論你是服務端流式、客戶端流式仍是雙向流式,必需要由客戶端進行發起,經過客戶端請求來創建流通訊。
GRPC的四種服務類型 by twtydgo
HTTP/2筆記之流和多路複用 by 聶永