上篇文章我介紹瞭如何在網關上增長自定義客戶端受權功能,從設計到編碼實現,一步一步詳細講解,相信你們也掌握了自定義中間件的開發技巧了,本篇咱們將介紹如何實現自定義客戶端的限流功能,來進一步完善網關的基礎功能。html
.netcore項目實戰交流羣(637326624),有興趣的朋友能夠在羣裏交流討論。mysql
限流就是爲了保證網關在高併發或瞬時併發時,在服務能承受範圍內,犧牲部分請求爲代價,保證系統的總體可用性而作的安全策略,避免單個服務影響總體網關的服務能力。
好比網關有商品查詢接口 ,能接受的極限請求是每秒100次查詢,若是此時不限流,可能由於瞬時請求太大,形成服務卡死或崩潰的狀況,這種狀況可使用Ocelot
客戶端全侷限流便可知足需求,如今又有一個需求,我須要把接口開放給A公司,他們也要查詢這個商品接口,這時A公司請求頻率也是咱們設置的每秒100次請求,顯然咱們不但願A公司有這麼高的請求頻率,我只會給A公司最大每秒一次的請求,那怎麼實現呢?這時咱們就沒法經過Ocelot
配置限流來進行自定義控制了,這塊就須要咱們增長自定義限流管道來實現功能。redis
下面咱們就該功能如何實現展開講解,但願你們先理解下功能需求,而後在延伸到具體實現。sql
限流這塊設計表結構和關係以下。
數據庫
主要有限流規則表、路由限流規則表、限流組表、限流組策略表、客戶端受權限流組表、客戶端白名單表組成,設計思想就是客戶端請求時先檢查是否在白名單,若是白名單不存在,就檢查是否在限流組裏,若是在限流組裏校驗限流的規則是什麼,而後比對這個規則和當前請求次數看是否可以繼續訪問,若是超過限流策略直接返回429狀態,不然路由到下端請求。c#
梳理下後發現流程不是很複雜,最起碼實現的思路很是清晰,而後咱們就運用上篇自定義受權中間件的方式來開發咱們第二個中間件,自定義限流中間件。後端
一、功能開啓配置緩存
網關應該支持自定義客戶端限流中間件是否啓用,由於一些小型項目是不須要對每一個客戶端進行單獨限流的,中型和大型項目纔有可能遇到自定義配置狀況,因此咱們須要在配置文件增長配置選項。在AhphOcelotConfiguration.cs
配置類中增長屬性,默認不開啓。安全
/// <summary> /// 金焰的世界 /// 2018-11-18 /// 是否開啓自定義限流,默認不開啓 /// </summary> public bool ClientRateLimit { get; set; } = false; /// <summary> /// 金焰的世界 /// 2018-11-18 /// 客戶端限流緩存時間,默認30分鐘 /// </summary> public int ClientRateLimitCacheTime { get; set; } = 1800;
那咱們如何把自定義的限流增長到網關流程裏呢?這塊咱們就須要訂製本身的限流中間件。併發
二、實現客戶端限流中間件
首先咱們定義一個自定義限流中間件AhphClientRateLimitMiddleware
,須要繼承OcelotMiddleware
,而後咱們要實現Invoke
方法,詳細代碼以下。
using Ctr.AhphOcelot.Configuration; using Ctr.AhphOcelot.Errors; using Ocelot.Logging; using Ocelot.Middleware; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Ctr.AhphOcelot.RateLimit.Middleware { /// <summary> /// 金焰的世界 /// 2018-11-18 /// 自定義客戶端限流中間件 /// </summary> public class AhphClientRateLimitMiddleware : OcelotMiddleware { private readonly IClientRateLimitProcessor _clientRateLimitProcessor; private readonly OcelotRequestDelegate _next; private readonly AhphOcelotConfiguration _options; public AhphClientRateLimitMiddleware(OcelotRequestDelegate next, IOcelotLoggerFactory loggerFactory, IClientRateLimitProcessor clientRateLimitProcessor, AhphOcelotConfiguration options) : base(loggerFactory.CreateLogger<AhphClientRateLimitMiddleware>()) { _next = next; _clientRateLimitProcessor = clientRateLimitProcessor; _options = options; } public async Task Invoke(DownstreamContext context) { var clientId = "client_cjy"; //使用默認的客戶端 if (!context.IsError) { if (!_options.ClientRateLimit) { Logger.LogInformation($"未啓用客戶端限流中間件"); await _next.Invoke(context); } else { //非認證的渠道 if (!context.DownstreamReRoute.IsAuthenticated) { if (context.HttpContext.Request.Headers.Keys.Contains(_options.ClientKey)) { clientId = context.HttpContext.Request.Headers[_options.ClientKey].First(); } } else {//認證過的渠道,從Claim中提取 var clientClaim = context.HttpContext.User.Claims.FirstOrDefault(p => p.Type == _options.ClientKey); if (!string.IsNullOrEmpty(clientClaim?.Value)) { clientId = clientClaim?.Value; } } //路由地址 var path = context.DownstreamReRoute.UpstreamPathTemplate.OriginalValue; //一、校驗路由是否有限流策略 //二、校驗客戶端是否被限流了 //三、校驗客戶端是否啓動白名單 //四、校驗是否觸發限流及計數 if (await _clientRateLimitProcessor.CheckClientRateLimitResultAsync(clientId, path)) { await _next.Invoke(context); } else { var error = new RateLimitOptionsError($"請求路由 {context.HttpContext.Request.Path}觸發限流策略"); Logger.LogWarning($"路由地址 {context.HttpContext.Request.Path} 觸發限流策略. {error}"); SetPipelineError(context, error); } } } else { await _next.Invoke(context); } } } }
首先咱們來分析下咱們的代碼,爲了知道是哪一個客戶端請求了咱們網關,須要提取clientId
,分別從無需受權接口和須要受權接口兩個方式提取,若是提取不到值直接給定默認值,放到全侷限流裏,防止繞過限流策略。而後根據客戶端經過4步檢驗下是否容許訪問(後面會介紹這4步怎麼實現),若是知足限流策略直接返回限流錯誤提醒。
有了這個中間件,那麼如何添加到Ocelot的管道里呢?上一篇介紹的很是詳細,這篇我就不介紹了,自定義限流中間件擴展AhphClientRateLimitMiddlewareExtensions
,代碼以下。
using Ocelot.Middleware.Pipeline; using System; using System.Collections.Generic; using System.Text; namespace Ctr.AhphOcelot.RateLimit.Middleware { /// <summary> /// 金焰的世界 /// 2018-11-18 /// 限流中間件擴展 /// </summary> public static class AhphClientRateLimitMiddlewareExtensions { public static IOcelotPipelineBuilder UseAhphAuthenticationMiddleware(this IOcelotPipelineBuilder builder) { return builder.UseMiddleware<AhphClientRateLimitMiddleware>(); } } }
有了這個中間件擴展後,咱們就在管道的合適地方加入咱們自定義的中間件。咱們添加咱們自定義的管道擴展OcelotPipelineExtensions
,而後把自定義限流中間件加入到認證以後。
//添加自定義限流中間件 2018-11-18 金焰的世界 builder.UseAhphClientRateLimitMiddleware();
如今咱們完成了網關的擴展和應用,是時候把定義的IClientRateLimitProcessor
接口實現了 ,是否是感受作一箇中間件很簡單呢?並且每一步都是層層關聯,只要一步一步按照本身的想法往下寫就能實現。
三、結合數據庫實現校驗及緩存
首先咱們新建AhphClientRateLimitProcessor
類來實現接口,中間增長必要的緩存和業務邏輯,詳細代碼以下。
using Ctr.AhphOcelot.Configuration; using Ocelot.Cache; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace Ctr.AhphOcelot.RateLimit { /// <summary> /// 金焰的世界 /// 2018-11-19 /// 實現客戶端限流處理器 /// </summary> public class AhphClientRateLimitProcessor : IClientRateLimitProcessor { private readonly AhphOcelotConfiguration _options; private readonly IOcelotCache<ClientRoleModel> _ocelotCache; private readonly IOcelotCache<RateLimitRuleModel> _rateLimitRuleCache; private readonly IOcelotCache<AhphClientRateLimitCounter?> _clientRateLimitCounter; private readonly IClientRateLimitRepository _clientRateLimitRepository; private static readonly object _processLocker = new object(); public AhphClientRateLimitProcessor(AhphOcelotConfiguration options,IClientRateLimitRepository clientRateLimitRepository, IOcelotCache<AhphClientRateLimitCounter?> clientRateLimitCounter, IOcelotCache<ClientRoleModel> ocelotCache, IOcelotCache<RateLimitRuleModel> rateLimitRuleCache) { _options = options; _clientRateLimitRepository = clientRateLimitRepository; _clientRateLimitCounter = clientRateLimitCounter; _ocelotCache = ocelotCache; _rateLimitRuleCache = rateLimitRuleCache; } /// <summary> /// 校驗客戶端限流結果 /// </summary> /// <param name="clientid">客戶端ID</param> /// <param name="path">請求地址</param> /// <returns></returns> public async Task<bool> CheckClientRateLimitResultAsync(string clientid, string path) { var result = false; var clientRule = new List<AhphClientRateLimitOptions>(); //一、校驗路由是否有限流策略 result = !await CheckReRouteRuleAsync(path); if (!result) {//二、校驗客戶端是否被限流了 var limitResult = await CheckClientRateLimitAsync(clientid, path); result = !limitResult.RateLimit; clientRule = limitResult.rateLimitOptions; } if (!result) {//三、校驗客戶端是否啓動白名單 result = await CheckClientReRouteWhiteListAsync(clientid, path); } if (!result) {//四、校驗是否觸發限流及計數 result = CheckRateLimitResult(clientRule); } return result; } /// <summary> /// 檢驗是否啓用限流規則 /// </summary> /// <param name="path">請求地址</param> /// <returns></returns> private async Task<bool> CheckReRouteRuleAsync(string path) { var region = _options.RedisKeyPrefix + "CheckReRouteRuleAsync"; var key = region + path; var cacheResult = _ocelotCache.Get(key, region); if (cacheResult != null) {//提取緩存數據 return cacheResult.Role; } else {//從新獲取限流策略 var result = await _clientRateLimitRepository.CheckReRouteRuleAsync(path); _ocelotCache.Add(key, new ClientRoleModel() { CacheTime = DateTime.Now, Role = result }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region); return result; } } /// <summary> /// 校驗客戶端限流規則 /// </summary> /// <param name="clientid">客戶端ID</param> /// <param name="path">請求地址</param> /// <returns></returns> private async Task<(bool RateLimit, List<AhphClientRateLimitOptions> rateLimitOptions)> CheckClientRateLimitAsync(string clientid, string path) { var region = _options.RedisKeyPrefix + "CheckClientRateLimitAsync"; var key = region + clientid + path; var cacheResult = _rateLimitRuleCache.Get(key, region); if (cacheResult != null) {//提取緩存數據 return (cacheResult.RateLimit, cacheResult.rateLimitOptions); } else {//從新獲取限流策略 var result = await _clientRateLimitRepository.CheckClientRateLimitAsync(clientid, path); _rateLimitRuleCache.Add(key, new RateLimitRuleModel() { RateLimit=result.RateLimit, rateLimitOptions=result.rateLimitOptions }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region); return result; } } /// <summary> /// 校驗是否設置了路由白名單 /// </summary> /// <param name="clientid">客戶端ID</param> /// <param name="path">請求地址</param> /// <returns></returns> private async Task<bool> CheckClientReRouteWhiteListAsync(string clientid, string path) { var region = _options.RedisKeyPrefix + "CheckClientReRouteWhiteListAsync"; var key = region +clientid+ path; var cacheResult = _ocelotCache.Get(key, region); if (cacheResult != null) {//提取緩存數據 return cacheResult.Role; } else {//從新獲取限流策略 var result = await _clientRateLimitRepository.CheckClientReRouteWhiteListAsync(clientid,path); _ocelotCache.Add(key, new ClientRoleModel() { CacheTime = DateTime.Now, Role = result }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region); return result; } } /// <summary> /// 校驗完整的限流規則 /// </summary> /// <param name="rateLimitOptions">限流配置</param> /// <returns></returns> private bool CheckRateLimitResult(List<AhphClientRateLimitOptions> rateLimitOptions) { bool result = true; if (rateLimitOptions != null && rateLimitOptions.Count > 0) {//校驗策略 foreach (var op in rateLimitOptions) { AhphClientRateLimitCounter counter = new AhphClientRateLimitCounter(DateTime.UtcNow, 1); //分別對每一個策略校驗 var enablePrefix = _options.RedisKeyPrefix + "RateLimitRule"; var key = AhphOcelotHelper.ComputeCounterKey(enablePrefix, op.ClientId, op.Period, op.RateLimitPath); var periodTimestamp = AhphOcelotHelper.ConvertToSecond(op.Period); lock (_processLocker) { var rateLimitCounter = _clientRateLimitCounter.Get(key, enablePrefix); if (rateLimitCounter.HasValue) {//提取當前的計數狀況 // 請求次數增加 var totalRequests = rateLimitCounter.Value.TotalRequests + 1; // 深拷貝 counter = new AhphClientRateLimitCounter(rateLimitCounter.Value.Timestamp, totalRequests); } else {//寫入限流策略 _clientRateLimitCounter.Add(key, counter,TimeSpan.FromSeconds(periodTimestamp), enablePrefix); } } if (counter.TotalRequests > op.Limit) {//更新請求記錄,並標記爲失敗 result = false; } if (counter.TotalRequests > 1 && counter.TotalRequests <= op.Limit) {//更新緩存配置信息 //獲取限流剩餘時間 var cur = (int)(counter.Timestamp.AddSeconds(periodTimestamp) - DateTime.UtcNow).TotalSeconds; _clientRateLimitCounter.Add(key, counter, TimeSpan.FromSeconds(cur), enablePrefix); } } } return result; } } }
咱們來分析下這塊代碼,裏面涉及了限流的提取和實現規則,首先咱們注入了數據庫實體接口和緩存信息,實現步驟是參照以前的流程。
主要流程以下:
一、路由是否啓用限流,若是未啓用直接完成校驗,若是進行第2步判斷.
二、客戶端對應的路由是否設置了限流規則,若是未設置,直接完成校驗,不然進入第3步判斷.
三、客戶端是否開啓了路由白名單功能,若是開啓了直接完成校驗,不然進入第4步。
四、使用Redis來進行限流的判斷。使用的就是計數器方法,結合redis設置key的過時時間來實現的。
爲了減小後端請求,在數據庫提取的方法前都加入了緩存,如今咱們須要把用到的接口添加到入口進行注入。
builder.Services.AddSingleton<IOcelotCache<RateLimitRuleModel>, InRedisCache<RateLimitRuleModel>>(); builder.Services.AddSingleton<IOcelotCache<AhphClientRateLimitCounter?>, InRedisCache<AhphClientRateLimitCounter?>>();
如今咱們還剩下IClientRateLimitRepository
接口未實現,如今只要實現這個接口,而後注入下,咱們就完成了限流中間件的開發了,咱們根據限流的流程,梳理了實現,如今有3個方法須要進行實現。
新建SqlServerClientRateLimitRepository
類,來開始實現咱們與數據庫的操做,有了上面的分析思路,如今就是把一個一個詳細肯定的方法實現而已,太簡單了,只要花了幾分鐘後,就能夠瞬間寫出以下代碼。
using Ctr.AhphOcelot.Configuration; using Ctr.AhphOcelot.RateLimit; using Dapper; using System; using System.Collections.Generic; using System.Data.SqlClient; using System.Text; using System.Threading.Tasks; namespace Ctr.AhphOcelot.DataBase.SqlServer { /// <summary> /// 金焰的世界 /// 2018-11-19 /// 客戶端限流信息提取 /// </summary> public class SqlServerClientRateLimitRepository : IClientRateLimitRepository { private readonly AhphOcelotConfiguration _option; public SqlServerClientRateLimitRepository(AhphOcelotConfiguration option) { _option = option; } /// <summary> /// 校驗客戶端限流規則 /// </summary> /// <param name="clientid">客戶端ID</param> /// <param name="path">請求地址</param> /// <returns></returns> public async Task<(bool RateLimit, List<AhphClientRateLimitOptions> rateLimitOptions)> CheckClientRateLimitAsync(string clientid, string path) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"SELECT DISTINCT UpstreamPathTemplate AS RateLimitPath,LimitPeriod AS Period,LimitNum AS Limit,ClientId FROM AhphReRoute T1 INNER JOIN AhphReRouteLimitRule T2 ON T1.ReRouteId=T2.ReRouteId INNER JOIN AhphLimitRule T3 ON T2.RuleId=T3.RuleId INNER JOIN AhphLimitGroupRule T4 ON T2.ReRouteLimitId=T4.ReRouteLimitId INNER JOIN AhphLimitGroup T5 ON T4.LimitGroupId=T5.LimitGroupId INNER JOIN AhphClientLimitGroup T6 ON T5.LimitGroupId=T6.LimitGroupId INNER JOIN AhphClients T7 ON T6.Id=T7.Id WHERE T1.InfoStatus=1 AND T1.UpstreamPathTemplate=@path AND T3.InfoStatus=1 AND T5.InfoStatus=1 AND ClientId=@clientid AND Enabled=1"; var result = (await connection.QueryAsync<AhphClientRateLimitOptions>(sql, new { clientid, path }))?.AsList(); if (result != null && result.Count > 0) { return (true, result); } else { return (false, null); } } } /// <summary> /// 校驗是否設置了路由白名單 /// </summary> /// <param name="clientid">客戶端ID</param> /// <param name="path">請求地址</param> /// <returns></returns> public async Task<bool> CheckClientReRouteWhiteListAsync(string clientid, string path) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"SELECT COUNT(1) FROM AhphReRoute T1 INNER JOIN AhphClientReRouteWhiteList T2 ON T1.ReRouteId=T2.ReRouteId INNER JOIN AhphClients T3 ON T2.Id=T3.Id WHERE T1.InfoStatus=1 AND UpstreamPathTemplate=@path AND ClientId=@clientid AND Enabled=1"; var result = await connection.QueryFirstOrDefaultAsync<int>(sql, new { clientid,path }); return result > 0; } } /// <summary> /// 校驗是否啓用限流規則 /// </summary> /// <param name="path">請求地址</param> /// <returns></returns> public async Task<bool> CheckReRouteRuleAsync(string path) { using (var connection = new SqlConnection(_option.DbConnectionStrings)) { string sql = @"SELECT COUNT(1) FROM AhphReRoute T1 INNER JOIN AhphReRouteLimitRule T2 ON T1.ReRouteId=T2.ReRouteId INNER JOIN AhphLimitRule T3 ON T2.RuleId=T3.RuleId WHERE T1.InfoStatus=1 AND UpstreamPathTemplate=@path AND T3.InfoStatus=1"; var result = await connection.QueryFirstOrDefaultAsync<int>(sql, new { path }); return result > 0; } } } }
主要就是注意下表之間的關係,把實現注入到AddAhphOcelot
裏,如今就能夠測試開始自定義客戶端限流中間件。
builder.Services.AddSingleton<IClientRateLimitRepository, SqlServerClientRateLimitRepository>();
四、測試限流中間件
爲了把把全部狀況都測試一遍,先從開啓限流,什麼都不寫入看是否可以正常運行。
option.ClientRateLimit = true;
還記得咱們上篇的兩個客戶端和能訪問的頁面嗎?就用它們來測試,結果顯示正常,說明不開啓限流沒有影響。
開啓/cjy/values
2個限流規則,一個每1分鐘訪問1次,一個每1分鐘訪問60次。
--一、插入限流規則 INSERT INTO AhphLimitRule VALUES('每1分鐘訪問1次','1m',1,1); INSERT INTO AhphLimitRule VALUES('每1分鐘訪問60次','1m',60,1); --二、應用到/cjy/values路由 INSERT INTO AhphReRouteLimitRule VALUES(1,1); INSERT INTO AhphReRouteLimitRule VALUES(2,1);
由於還未給客戶端應用規則,因此應該也是能夠正常訪問,可使用PostMan
測試下,測試時須要注意下緩存,由於全部的訪問都啓用的默認緩存策略,經測試獲得預期效果。
如今開始把限流分別應用到客戶端1和客戶端2,看下限流效果。
--三、插入測試分組 INSERT INTO AhphLimitGroup VALUES('限流分組1','',1); INSERT INTO AhphLimitGroup VALUES('限流分組2','',1); --四、分組應用策略 INSERT INTO AhphLimitGroupRule VALUES(1,1); INSERT INTO AhphLimitGroupRule VALUES(2,2); --五、客戶端應用限流分組 INSERT INTO AhphClientLimitGroup VALUES(2,1); INSERT INTO AhphClientLimitGroup VALUES(3,2);
而後使用PostMan
測試客戶端1和客戶端2,結果以下,超過設置的頻率後不返回結果,達到預期目的,可是返回的是404錯誤,強迫症患者表示這不優雅啊,應該是429 Too Many Requests
,那咱們如何修改呢?
這裏就須要瞭解下錯誤信息是如何輸出的,須要查看Ocelot
源碼,您會發現IErrorsToHttpStatusCodeMapper
接口和ErrorsToHttpStatusCodeMapper
實現,代碼以下,
using System.Collections.Generic; using System.Linq; using Ocelot.Errors; namespace Ocelot.Responder { public class ErrorsToHttpStatusCodeMapper : IErrorsToHttpStatusCodeMapper { public int Map(List<Error> errors) { if (errors.Any(e => e.Code == OcelotErrorCode.UnauthenticatedError)) { return 401; } if (errors.Any(e => e.Code == OcelotErrorCode.UnauthorizedError || e.Code == OcelotErrorCode.ClaimValueNotAuthorisedError || e.Code == OcelotErrorCode.ScopeNotAuthorisedError || e.Code == OcelotErrorCode.UserDoesNotHaveClaimError || e.Code == OcelotErrorCode.CannotFindClaimError)) { return 403; } if (errors.Any(e => e.Code == OcelotErrorCode.RequestTimedOutError)) { return 503; } if (errors.Any(e => e.Code == OcelotErrorCode.UnableToFindDownstreamRouteError)) { return 404; } if (errors.Any(e => e.Code == OcelotErrorCode.UnableToCompleteRequestError)) { return 500; } return 404; } } }
能夠發現由於未定義RateLimitOptionsError
錯誤的狀態碼,增長一個判斷便可,那咱們重寫下把,而後集成在咱們本身的中間件裏,這塊在後期有不少擴展可以用到,增長以下代碼。
if (errors.Any(e => e.Code == OcelotErrorCode.RateLimitOptionsError)) { return 429; }
而後從新注入下。
builder.Services.AddSingleton<IErrorsToHttpStatusCodeMapper, AhphErrorsToHttpStatusCodeMapper>();
在從新測試下訪問限流地址。
奈斯,達到了咱們預期的效果,.netcore
開發魅力體現出來了嗎?
咱們增長客戶端1的路由白名單,而後再繼續測試看是否解除限流限制?
--六、設置客戶端1/cjy/values路由白名單 INSERT INTO AhphClientReRouteWhiteList VALUES(1,2);
注意測試時清除緩存
經測試不受限流控制,達到了咱們最終目的,到此限流功能所有實現。
五、增長mysql支持
直接重寫IClientRateLimitRepository
實現,而後注入實現。
builder.Services.AddSingleton<IClientRateLimitRepository, MySqlClientRateLimitRepository>();
本篇咱們講解的是網關如何實現自定義客戶端限流功能,從設計到實現一步一步詳細講解,雖然只用一篇就寫完了,可是涉及的知識點仍是很是多的,但願你們認真理解實現的思想,看我是如何從規劃到實現的,爲了更好的幫助你們理解。你們能夠根據博客內容本身手動實現下,有利於消化,若是在操做中遇到什麼問題,能夠加.NET Core項目實戰交流羣(QQ羣號:637326624)
諮詢做者。
從下一篇開始介紹IdentityServer4
的相關應用,並配合咱們的網關實現認證,在跟我教程學習的朋友,能夠本身先預習下。