【.NET Core項目實戰-統一認證平臺】開篇及目錄索引html
RPC是「遠程調用(Remote Procedure Call)」的一個名稱的縮寫,並非任何規範化的協議,也不是大衆都認知的協議標準,咱們更多時候使用時都是建立的自定義化(例如Socket,Netty)的消息方式進行調用,相比http協議,咱們省掉了很多http中無用的消息內容。所以不少系統內部調用仍然採用自定義化的RPC調用模式進行通訊,畢竟速度和性能是內網的關鍵指標之一,而標準化和語義無關性在外網中舉足輕重。因此,爲什麼API網關沒法工做在RPC上,由於它沒有一個像HTTP/HTTPS那樣的通用標準。nginx
CzarRpc是做者基於Dotnetty實現的RPC通信框架,參考了Surging
和Tars.Net
優秀設計,目前正在內部使用中,下面就CzarRpc調用方式作一個簡單介紹,測試結構以下:
git
一、服務接口github
新建一個Czar.Rpc.Common
類庫,首先須要引用Czar.Rpc
Nuget包。web
Install-Package Czar.Rpc
而後定義測試接口IHelloRpc.cs
,也是目前支持的調用方式。sql
using Czar.Rpc.Attributes; using Czar.Rpc.Exceptions; using Czar.Rpc.Metadata; using System; using System.Collections.Generic; using System.Threading.Tasks; namespace Czar.Rpc.Common { /// <summary> /// 測試Rpc實體 /// </summary> [BusinessExceptionInterceptor] [CzarRpc("Demo.Rpc.Hello")] public interface IHelloRpc: IRpcBaseService { string Hello(int no, string name); void HelloHolder(int no, out string name); Task<string> HelloTask(int no, string name); ValueTask<string> HelloValueTask(int no, string name); [CzarOneway] void HelloOneway(int no, string name); Task TestBusinessExceptionInterceptor(); DemoModel HelloModel(int D1, string D2, DateTime D3); Task<DemoModel> HelloModelAsync(int D1, string D2, DateTime D3); DemoModel HelloSendModel(DemoModel model); DemoModel HelloSendModelParm(string name,DemoModel model); List<DemoModel> HelloSendModelList(List<DemoModel> model); } public class DemoModel { /// <summary> /// 測試1 /// </summary> public int T1 { get; set; } /// <summary> /// 測試2 /// </summary> public string T2 { get; set; } /// <summary> /// 測試3 /// </summary> public DateTime T3 { get; set; } public ChildModel Child { get; set; } } public class ChildModel { public string C1 { get; set; } } }
2.服務端json
新建一個控制檯程序Czar.Rpc.Server
,而後實現服務接口,由於都是測試數據,因此就隨意實現了方法。c#
HelloRpcServer.cs
後端
using Czar.Rpc.Exceptions; using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Linq; using System.Net; using Czar.Rpc.Common; namespace Demo.Rpc.Server { public class HelloRpcServer: IHelloRpc { public EndPoint CzarEndPoint { get; set; } public string Hello(int no, string name) { string result = $"{no}: Hi, {name}"; Console.WriteLine(result); return result + " callback"; } public void HelloHolder(int no, out string name) { name = no.ToString() + " out"; } public void HelloOneway(int no, string name) { /* 耗時操做 */ Console.WriteLine($"From oneway - {no}: Hi, {name}"); } public Task<string> HelloTask(int no, string name) { return Task.FromResult(Hello(no, name)); } public ValueTask<string> HelloValueTask(int no, string name) { return new ValueTask<string>(Hello(no, name)); } public Task TestBusinessExceptionInterceptor() { throw new BusinessException() { CzarCode = "1", CzarMessage = "test" }; } public DemoModel HelloModel(int D1, string D2, DateTime D3) { return new DemoModel() { T1 = D1 + 1, T2 = D2 + "2", T3 = D3.AddDays(1) }; } public async Task<DemoModel> HelloModelAsync(int D1, string D2, DateTime D3) { return await Task.FromResult( new DemoModel() { T1 = D1 + 1, T2 = D2 + "77777", T3 = D3.AddDays(1) } ); } public DemoModel HelloSendModel(DemoModel model) { model.T1 = model.T1 + 10; model.T2 = model.T2 + "11"; model.T3 = model.T3.AddDays(12); return model; } public DemoModel HelloSendModelParm(string name, DemoModel model) { model.T1 = model.T1 + 10; model.T2 = model.T2 + "11"; model.T3 = model.T3.AddDays(12); if (model.Child != null) { model.Child.C1 = name+"說:"+ model.Child.C1; } return model; } public List<DemoModel> HelloSendModelList(List<DemoModel> model) { return model.Select(t => new DemoModel() { T1=t.T1+10,T2=t.T2+"13",T3=t.T3.AddYears(1),Child=t.Child }).ToList(); } } }
而後啓動服務端監聽。緩存
class Program { static void Main(string[] args) { var host = new HostBuilder() .ConfigureHostConfiguration(i => i.AddJsonFile("CzarConfig.json")) .ConfigureLogging((hostContext, configLogging) => { configLogging.AddConsole(); }) .UseCodec<JsonCodec>() .UseLibuvTcpHost() .UseProxy() .UseConsoleLifetime() .Build(); host.RunAsync().Wait(); } }
啓用外部使用CzarConfig.json的配置文件,注意須要設置成始終複製。
{ "CzarHost": { "Port": 7711, //監聽端口 "QuietPeriodSeconds": 2, //退出靜默時間 DotNetty特性 "ShutdownTimeoutSeconds": 2, //關閉超時時間 DotNetty特性 "IsSsl": "false", //是否啓用 SSL, 客戶端須要保持一致 "PfxPath": "cert/datasync.pfx", //證書 "PfxPassword": "123456" //證書密鑰 } }
到此服務器端搭載完成。
三、客戶端
新建客戶端控制檯程序Czar.Rpc.Client
,而後配置Rpc調用信息。
{ "CzarHost": { "ProxyEndPoint": true, //是否啓用動態服務地址,就是指定服務端IP "IsSsl": "false", //是否啓用SSL "PfxPath": "cert/datasync.pfx", //證書 "PfxPassword": "123456", //證書密鑰 "ClientConfig": { //客戶端配置 "Demo.Rpc.Hello": { //對應服務[CzarRpc("Demo.Rpc.Hello")] 值 "Host": "127.0.0.1", //服務端IP 若是ProxyEndPoint=false 時使用 "Port": 7711, //服務端端口 若是ProxyEndPoint=false 時使用 "Timeout": 10, //調用超時時間 "WriterIdleTimeSeconds";30 //空閒超時時間,默認爲30秒,非內網環境建議設置成5分鐘內。 } } } }
如今開始啓用客戶端信息。
class Program { public static IServiceProvider service; public static IConfiguration config; static async Task Main(string[] args) { try { var builder = new ConfigurationBuilder(); config = builder.AddJsonFile("CzarConfig.json").Build(); service = new ServiceCollection() .AddSingleton(config) .AddLogging(j => j.AddConsole()) .AddLibuvTcpClient(config) .AddProxy() .BuildDynamicProxyServiceProvider(); var rpc = service.GetRequiredService<IHelloRpc>(); //使用的內部指定的服務器地址 rpc.CzarEndPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 7711); var result = string.Empty; string t = "基本調用"; result = rpc.Hello(18, t); Console.WriteLine(result); result = "無返回結果"; rpc.HelloHolder(1, out result); Console.WriteLine(result); result = await rpc.HelloTask(2, "異步任務"); Console.WriteLine(result); result = "單向"; rpc.HelloOneway(3, "單向調用"); Console.WriteLine(result); result = await rpc.HelloValueTask(4, "ValueTask任務"); Console.WriteLine(result); var modelResult = rpc.HelloModel(5, "返回實體", DateTime.Now); Console.WriteLine($"{modelResult.T1} {modelResult.T2} {modelResult.T3.ToLongDateString()}"); var modelResult1 = await rpc.HelloModelAsync(6, "返回Task實體", DateTime.Now); Console.WriteLine($"{modelResult1.T1} {modelResult1.T2} {modelResult1.T3.ToLongDateString()}"); var mm = new DemoModel() { T1 = 7, T2 = "傳實體返回實體", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子類1" } }; var model2 = rpc.HelloSendModel(mm); Console.WriteLine($"{model2.T1} {model2.T2} {model2.T3.ToLongDateString()} {model2.Child.C1}"); var list = new List<DemoModel>(); var mm1 = new DemoModel() { T1 = 8, T2 = "傳List返回List", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子類2" } }; var mm3 = new DemoModel() { T1 = 9, T2 = "傳List返回List", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子類3" } }; list.Add(mm1); list.Add(mm3); var list3 = rpc.HelloSendModelList(list); Console.WriteLine($"{list3[0].T1} {list3[0].T2} {list3[0].T3.ToLongDateString()} {list3[0].Child?.C1}"); var mm4 = new DemoModel() { T1 = 9, T2 = "HelloSendModelParm", T3 = DateTime.Now, Child = new ChildModel() { C1 = "子類4" } }; var dd = rpc.HelloSendModelParm("HelloSendModelParm", mm4); Console.WriteLine($"{dd.T1} {dd.T2} {dd.T3.ToLongDateString()} {dd.Child.C1}"); //異常調用 await rpc.TestBusinessExceptionInterceptor(); } catch (BusinessException e) { Console.WriteLine($"CzarCode:{e.CzarCode} CzarMessage:{e.CzarMessage}"); } catch (Exception ex) { Console.WriteLine(ex); } Console.ReadLine(); } }
如今整個RPC調用搭建完畢,而後分別啓動服務器端和客戶端,就能夠看到屏幕輸出內容以下。
客戶端輸出:
服務器端輸出:
至此整個CzarRpc的基本使用已經介紹完畢,感興趣的朋友能夠自行測試。
有了CzarRpc
的通信框架後,如今在Ocelot
上實現Rpc
功能簡直易如反掌,如今開始添加咱們的Rpc
中間件,也讓咱們擴展的網關靈活起來。
還記得我介紹網關篇時添加中間件的步驟嗎?若是不記得的能夠先回去回顧下。
首先如何讓網關知道這個後端調用是http
仍是Rpc
呢?這時應該會想到Ocelot
路由配置裏的DownstreamScheme
,能夠在這裏判斷咱們定義的是http
仍是rpc
便可。同時咱們但願以前定義的全部中間件都生效,最後一步請求時若是配置下端路由rpc
,使用rpc
調用,不然使用http
調用,這樣能夠重複利用以前全部的中間件功能,減小重複開發。
在以前的開發的自定義限流和自定義受權中間件開發中,咱們知道開發完的中間件放到哪裏使用,這裏就不介紹原理了,直接添加到BuildCzarOcelotPipeline
裏以下代碼。
public static OcelotRequestDelegate BuildCzarOcelotPipeline(this IOcelotPipelineBuilder builder, OcelotPipelineConfiguration pipelineConfiguration) { // 註冊一個全局異常 builder.UseExceptionHandlerMiddleware(); // 若是請求是websocket使用單獨的管道 builder.MapWhen(context => context.HttpContext.WebSockets.IsWebSocketRequest, app => { app.UseDownstreamRouteFinderMiddleware(); app.UseDownstreamRequestInitialiser(); app.UseLoadBalancingMiddleware(); app.UseDownstreamUrlCreatorMiddleware(); app.UseWebSocketsProxyMiddleware(); }); // 添加自定義的錯誤管道 builder.UseIfNotNull(pipelineConfiguration.PreErrorResponderMiddleware); //使用自定義的輸出管道 builder.UseCzarResponderMiddleware(); // 下游路由匹配管道 builder.UseDownstreamRouteFinderMiddleware(); //增長自定義擴展管道 if (pipelineConfiguration.MapWhenOcelotPipeline != null) { foreach (var pipeline in pipelineConfiguration.MapWhenOcelotPipeline) { builder.MapWhen(pipeline); } } // 使用Http頭部轉換管道 builder.UseHttpHeadersTransformationMiddleware(); // 初始化下游請求管道 builder.UseDownstreamRequestInitialiser(); // 使用自定義限流管道 builder.UseRateLimiting(); //使用請求ID生成管道 builder.UseRequestIdMiddleware(); //使用自定義受權前管道 builder.UseIfNotNull(pipelineConfiguration.PreAuthenticationMiddleware); //根據請求判斷是否啓用受權來使用管道 if (pipelineConfiguration.AuthenticationMiddleware == null) { builder.UseAuthenticationMiddleware(); } else { builder.Use(pipelineConfiguration.AuthenticationMiddleware); } //添加自定義限流中間件 2018-11-18 金焰的世界 builder.UseCzarClientRateLimitMiddleware(); //添加自定義受權中間件 2018-11-15 金焰的世界 builder.UseAhphAuthenticationMiddleware(); //啓用自定義的認證以前中間件 builder.UseIfNotNull(pipelineConfiguration.PreAuthorisationMiddleware); //是否使用自定義的認證中間件 if (pipelineConfiguration.AuthorisationMiddleware == null) { builder.UseAuthorisationMiddleware(); } else { builder.Use(pipelineConfiguration.AuthorisationMiddleware); } // 使用自定義的參數構建中間件 builder.UseIfNotNull(pipelineConfiguration.PreQueryStringBuilderMiddleware); // 使用負載均衡中間件 builder.UseLoadBalancingMiddleware(); // 使用下游地址建立中間件 builder.UseDownstreamUrlCreatorMiddleware(); // 使用緩存中間件 builder.UseOutputCacheMiddleware(); //判斷下游的是否啓用rpc通訊,切換到RPC處理 builder.MapWhen(context => context.DownstreamReRoute.DownstreamScheme.Equals("rpc", StringComparison.OrdinalIgnoreCase), app => { app.UseCzarRpcMiddleware(); }); //使用下游請求中間件 builder.UseCzaHttpRequesterMiddleware(); return builder.Build(); }
這裏是在最後請求前判斷使用的下游請求方式,若是DownstreamScheme
使用的rpc
,就使用rpc
中間件處理。
Rpc處理的完整邏輯是,如何從http請求中獲取想要解析的參數,這裏須要設置匹配的優先級,目前設計的優先級爲。
一、首先提取路由參數,若是匹配上就是用路由參數名稱爲key,值爲value,按順序組成第一批參數。
二、提取query參數,若有有值按順序組成第二批參數。
三、若是非Get請求,提取body內容,若是非空,組成第三批參數
四、從配置庫裏提取rpc路由調用的服務名稱和函數名稱,以及是否單向調用。
五、按照獲取的數據進行rpc調用並等待返回。
看了上面的設計是否是思路很清晰了呢?
一、rpc路由表設計
CREATE TABLE AhphReRouteRpcConfig ( RpcId int IDENTITY(1,1) NOT NULL, ReRouteId int, //路由表主鍵 ServantName varchar(100) NOT NULL, //調用的服務名稱 FuncName varchar(100) NOT NULL, //調用的方法名稱 IsOneway bit NOT NULL //是否單向調用 )
二、提取遠程調用方法
根據上游路由獲取遠程調用的配置項目
public interface IRpcRepository { /// <summary> /// 根據模板地址獲取RPC請求方法 /// </summary> /// <param name="UpUrl">上游模板</param> /// <returns></returns> Task<RemoteInvokeMessage> GetRemoteMethodAsync(string UpUrl); } public class SqlServerRpcRepository : IRpcRepository { private readonly CzarOcelotConfiguration _option; public SqlServerRpcRepository(CzarOcelotConfiguration option) { _option = option; } /// <summary> /// 獲取RPC調用方法 /// </summary> /// <param name="UpUrl"></param> /// <returns></returns> public async Task<RemoteInvokeMessage> GetRemoteMethodAsync(string UpUrl) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"select T4.* from AhphGlobalConfiguration t1 inner join AhphConfigReRoutes T2 on T1.AhphId=t2.AhphId inner join AhphReRoute T3 on T2.ReRouteId=T3.ReRouteId INNER JOIN AhphReRouteRpcConfig T4 ON T3.ReRouteId=T4.ReRouteId where IsDefault=1 and T1.InfoStatus=1 AND T3.InfoStatus=1 AND UpstreamPathTemplate=@URL"; var result = await connection.QueryFirstOrDefaultAsync<RemoteInvokeMessage>(sql, new { URL = UpUrl }); return result; } } }
三、重寫返回結果
因爲rpc調用後是返回的Json封裝的信息,須要解析成對應的HttpContent。
using System.IO; using System.Net; using System.Net.Http; using System.Threading.Tasks; namespace Czar.Gateway.Rpc { public class RpcHttpContent : HttpContent { private string result; public RpcHttpContent(string result) { this.result = result; } public RpcHttpContent(object result) { this.result = Newtonsoft.Json.JsonConvert.SerializeObject(result); } protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) { var writer = new StreamWriter(stream); await writer.WriteAsync(result); await writer.FlushAsync(); } protected override bool TryComputeLength(out long length) { length = result.Length; return true; } } }
四、rpc中間件邏輯處理
有了前面的準備信息,如今基本能夠完成邏輯代碼的開發了,詳細的中間件代碼以下。
using Czar.Gateway.Errors; using Czar.Rpc.Clients; using Ocelot.Logging; using Ocelot.Middleware; using Ocelot.Responses; using System.Collections.Generic; using System.Net; using System.Threading.Tasks; namespace Czar.Gateway.Rpc.Middleware { public class CzarRpcMiddleware : OcelotMiddleware { private readonly OcelotRequestDelegate _next; private readonly IRpcClientFactory _clientFactory; private readonly ICzarRpcProcessor _czarRpcProcessor; public CzarRpcMiddleware(OcelotRequestDelegate next, IRpcClientFactory clientFactory, IOcelotLoggerFactory loggerFactory, ICzarRpcProcessor czarRpcProcessor) : base(loggerFactory.CreateLogger<CzarRpcMiddleware>()) { _next = next; _clientFactory = clientFactory; _czarRpcProcessor = czarRpcProcessor; } public async Task Invoke(DownstreamContext context) { var httpStatusCode = HttpStatusCode.OK; var _param = new List<object>(); //一、提取路由參數 var tmpInfo = context.TemplatePlaceholderNameAndValues; if (tmpInfo != null && tmpInfo.Count > 0) { foreach (var tmp in tmpInfo) { _param.Add(tmp.Value); } } //二、提取query參數 foreach (var _q in context.HttpContext.Request.Query) { _param.Add(_q.Value.ToString()); } //三、從body裏提取內容 if (context.HttpContext.Request.Method.ToUpper() != "GET") { context.DownstreamRequest.Scheme = "http"; var requert = context.DownstreamRequest.ToHttpRequestMessage(); if (requert.Content!=null) { var json = "{}"; json = await requert.Content.ReadAsStringAsync(); _param.Add(json); } } //從緩存裏提取 var req = await _czarRpcProcessor.GetRemoteMethodAsync(context.DownstreamReRoute.UpstreamPathTemplate.OriginalValue); if (req != null) { req.Parameters = _param.ToArray(); var result = await _clientFactory.SendAsync(req, GetEndPoint(context.DownstreamRequest.Host, context.DownstreamRequest.Port)); OkResponse<RpcHttpContent> httpResponse; if (result.CzarCode == Czar.Rpc.Utilitys.RpcStatusCode.Success) { httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result.CzarResult?.ToString())); } else { httpResponse = new OkResponse<RpcHttpContent>(new RpcHttpContent(result)); } context.HttpContext.Response.ContentType = "application/json"; context.DownstreamResponse = new DownstreamResponse(httpResponse.Data, httpStatusCode, httpResponse.Data.Headers, "OK"); } else {//輸出錯誤 var error = new InternalServerError($"請求路由 {context.HttpContext.Request.Path}未配置後端轉發"); Logger.LogWarning($"{error}"); SetPipelineError(context, error); } } private EndPoint GetEndPoint(string ipaddress, int port) { if (IPAddress.TryParse(ipaddress, out IPAddress ip)) { return new IPEndPoint(ip, port); } else { return new DnsEndPoint(ipaddress, port); } } } }
五、啓動Rpc客戶端配置
目前Rpc的客戶端配置咱們還沒啓動,只須要在AddCzarOcelot
中添加相關注入便可。
var service = builder.First(x => x.ServiceType == typeof(IConfiguration)); var configuration = (IConfiguration)service.ImplementationInstance; //Rpc應用 builder.AddSingleton<ICzarRpcProcessor, CzarRpcProcessor>(); builder.AddSingleton<IRpcRepository, SqlServerRpcRepository>(); builder.AddLibuvTcpClient(configuration);
六、配置客戶端
最後別忘了配置Rpc客戶端信息是否啓用證書信息,爲了配置信息的內容。
{ "CzarHost": { "ProxyEndPoint": true, "IsSsl": "false", "PfxPath": "cert/datasync.pfx", "PfxPassword": "bl123456", "ClientConfig": { "Demo.Rpc.Hello": { "Host": "127.0.0.1", "Port": 7711, "Timeout": 20 } } } }
如今讓網關集成Rpc功能所有配置完畢。
本次測試我在原有的網關基礎上,增長不一樣類型的Rpc調用,就按照不一樣維度測試Rpc調用功能,本次測試案例是創建在Czar.Rpc 服務端基礎上,正好能夠測試。
一、測試路由參數
請求路徑/hello/{no}/{name}
,調用的服務端方法Hello
,傳入的兩個參數分別是no ,name
。
能夠在服務器端添加斷點調試,發現確實接收到請求信息,並正常返回,下面是PostMan
測試結果。
二、使用Query方式傳遞參數
請求路徑/rpc/query
,調用的服務端方法仍是Hello
,參數分別是no ,name
。
三、使用Post方式傳遞Json
請求路徑/rpc/body
,調用的服務器方法是HelloSendModel
。
四、混合參數使用
請求的路徑/rpc/bodyparm/{name}
,調用的服務器端方法是HelloSendModelParm
。
全部的返回結果可自行調試測試,發現都能達到預期結果。
同時此網關仍是支持默認的http請求的,這裏就不一一測試了。
本篇我介紹了什麼是Rpc,以及Czar.Rpc的基本使用,而後使用Czar.Rpc框架集成到咱們基於Ocelot擴展網關中,並實現了不能方式的Rpc調用,能夠在幾乎不改變現有流程的狀況下很快速的集成進去,這也是Ocelot開發框架的魅力所在。
若是在使用過程當中有什麼問題或建議,能夠在.NET Core項目實戰交流羣(637326624)
中聯繫做者。
最後本文涉及的全部的源代碼可在https://github.com/jinyancao/czar.gateway中下載預覽。