.Net 如何模擬會話級別的信號量,對http接口調用頻率進行限制(有demo)

如今,由於種種因素,你必須對一個請求或者方法進行頻率上的訪問限制。
好比, 你對外提供了一個API接口,註冊用戶每秒鐘最多能夠調用100次,非註冊用戶每秒鐘最多能夠調用10次。
好比, 有一個很是吃服務器資源的方法,在同一時刻不能超過10我的調用這個方法,不然服務器滿載。
好比, 有一些特殊的頁面,訪客並不能頻繁的訪問或發言。
好比, 秒殺活動等進行。
好比 ,防範DDOS,當達到必定頻率後調用腳本iis服務器ip黑名單,防火牆黑名單。
如上種種的舉例,也就是說,如何從一個切面的角度對調用的方法進行頻率上的限制。而對頻率限制,服務器層面都有最直接的解決方法,如今我說的則是代碼層面上的頻率管控。html

本文給出兩個示例,一個是基於單機環境的實現,第二個則是基於分佈式的Redis實現web

--------------------redis

以第一個API接口需求爲例,先說下單機環境下的實現。
按照慣性思惟,咱們天然會想到緩存的過時策略這種方法,可是嚴格來說就HttpRuntime.Cache而言,經過緩存的過時策略來對請求進行頻率的併發控制是不合適的。
  HttpRuntime.Cache 是應用程序級別的Asp.Net的緩存技術,經過這個技術能夠申明多個緩存對象,能夠爲每一個對象設置過時時間,當過時時間到達後該緩存對象就會消失(也就是當你訪問該對象的時候爲Null)算法

  爲何這樣說呢?好比對某個方法(方法名:GetUserList)咱們要進行1秒鐘最多10次的限制,如今咱們就新建一個int型的Cache對象,而後設置1秒鐘後過時消失。那麼每當訪問GetUserList方法前,咱們就先判斷這個Cache對象的值是否大於10,若是大於10就不執行GetUserList方法,若是小於10則容許執行。每當訪問該對象的時候若是不存在或者過時就新建,這樣周而復始,則該對象永遠不可能超過10。sql

1   if ((int)HttpRuntime.Cache["GetUserListNum"] > 10) //大於10請求失敗 2  { 3      Console.WriteLine("禁止請求"); 4  } 5   else
6  { 7      HttpRuntime.Cache["GetUserListNum"] = (int)HttpRuntime.Cache["GetUserListNum"] + 1; //不然該緩存對象的值+1 8      Console.WriteLine("容許請求"); 9   }

這樣的思想及實現相對來講很是簡單,可是基於這樣的一個模型設定,那麼就會出現這種狀況:數據庫

 

 

如上圖,每一個點表明一次訪問請求,我在0秒的時候 新建了一個名字爲GetUserListNum的緩存對象。
在0~0.5秒期間 我訪問了3次在0.5~1秒期間,咱們訪問了7次。此時,該對象消失,而後咱們接着訪問,該對象重置爲0.
                在第1~1.5秒期間,仍是訪問了7次,在第1.5秒~2秒期間訪問了3次。緩存

基於這種簡單緩存過時策略的模型,在這2秒鐘內,咱們雖然平均每秒鐘都訪問了10次,知足這個規定,可是若是咱們從中取一個期間段,0.5秒~1.5秒期間,也是1秒鐘,可是卻實實在在的訪問了14次!遠遠超過了咱們設置的 1秒鐘最多訪問10次的 限制。安全

 

那麼如何科學的來解決上面的問題呢?咱們能夠經過模擬會話級別的信號量這一手段,這也就是咱們今天的主題了。
   什麼是信號量?僅就以代碼而言,  static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(5);  它的意思就表明在多線程狀況下,在任何一時刻,只能同時5個線程去訪問。服務器

 

4容器4線程模型

如今,在實現代碼的以前咱們先設計一個模型。網絡

  假設咱們有一個用戶A的管道,這個管道里裝着用戶A的請求,好比用戶A在一秒鐘發出了10次請求,那麼每個請求過來,管道里的元素都會多一個。可是咱們設定這個管道最多隻能容納10個元素,並且每一個元素的存活期爲1秒,1秒後則該元素消失。那麼這樣設計的話,不管是速率仍是數量的突進,都會有管道長度的限制。這樣一來,不管從哪個時間節點或者時間間隔出發,這個管道都能知足咱們的頻率限制需求。

而這裏的管道,就必須和會話Id來對應了。每當有新會話進來的時候就生成一個新管道。這個會話id根據本身場景所定,能夠是sessionId,能夠是ip,也能夠是token。

那麼既然這個管道是會話級別的,咱們確定得須要一個容器,來裝這些管道。如今,咱們以IP來命名會話管道,並把全部的管道都裝載在一個容器中,如圖

而基於剛纔的設定,咱們還須要對容器內的每條管道的元素進行處理,把過時的給剔除掉,爲此,還須要單獨爲該容器開闢出一個線程來爲每條管道進行元素的清理。而當管道的元素爲0時,咱們就清掉該管道,以便節省容器空間。

 

固然,因爲用戶量多,一個容器內可能存在上萬個管道,這個時候僅僅用一個容器來裝載來清理,在效率上顯然是不夠的。這個時候,咱們就得對容器進行橫向擴展了。

  好比,咱們能夠根據Cpu核心數自動生成對應的數量的容器,而後根據一個算法,對IP來進行導流。我當前cpu是4個邏輯核心,就生成了4個容器,每當用戶訪問的時候,都會最早通過一個算法,這個算法會對IP進行處理,如192.168.1.11~192.168.1.13這個Ip段進第一個容器,xxx~xxx進第二個容器,依次類推,相應的,也就有了4個線程去分別處理4個容器中的管道。

 

那麼,最終就造成了咱們的4容器4線程模型了。

如今,着眼於編碼實現:

  首先咱們須要一個能承載這些容器的載體,這個載體相似於鏈接池的概念,能夠根據一些須要自動生成適應數量的容器,若是有特殊要求的話,還能夠在容器上切出一個容器管理的面,在線程上切出一個線程管理的面以便於實時監控和調度。若是真要作這樣一個系統,那麼 容器的調度 和 線程的調度功能 是必不可少的,而本Demo則是完成了主要功能,像容器和線程在代碼中我也沒剝離開來,爲了更好的直觀的體現demo的含義,算法也是直接寫死的,集合容器的內存管理與元素的選型待優化,實際設計中,這些都得優化,還有多線程模型中,怎樣上鎖才能讓效率最大化也是重中之重的。

而這裏爲了案例的直觀就直接寫死成4個容器。

public static List<Container> ContainerList = new List<Container>(); //容器載體 static Factory() { for (int i = 0; i < 4; i++) { ContainerList.Add(new Container(i)); //遍歷4次 生成4個容器 } foreach (var item in ContainerList) { item.Run(); //開啓線程 } }

如今,咱們假定 有編號爲 0 到 40 這樣的 41個用戶。那麼這個導流算法 我也就直接寫死,編號0至9的用戶 將他們的請求給拋轉到第一個容器,編號10~19的用戶 放到第二個容器,編號20~29放到第三個容器,編號30~40的用戶放到第四個容器。

那麼這個代碼就是這樣的:

static Container GetContainer(int userId, out int i) //獲取容器的算法 { if (0 <= userId && userId < 10) //編號0至9的用戶  返回第一個容器 依次類推 { i = 0; return ContainerList[0]; } if (10 <= userId && userId < 20) { i = 1; return ContainerList[1]; } if (20 <= userId && userId < 30) { i = 2; return ContainerList[2]; } i = 3; return ContainerList[3]; }

當咱們的會話請求通過算法的導流以後,都必須調用一個方法,用於辨別管道數量。若是管道數量已經大於10,則請求失敗,不然成功

public static void Add(int userId) { if (GetContainer(userId, out int i).Add(userId)) Console.WriteLine("容器" + i + " 用戶" + userId + " 發起請求"); else Console.WriteLine("容器" + i + " 用戶" + userId + " 被攔截"); }

接下來就是容器Container的代碼了。

這裏,對容器的選型用線程安全的ConcurrentDictionary類。
  線程安全:當多個線程同時讀寫同一個共享元素的時候,就會出現數據錯亂,迭代報錯等安全問提
  ConcurrentDictionary:除了GetOrAdd方法要慎用外,是.Net4.0專爲解決Dictionary線程安全而出的新類型
  ReaderWriterLockSlim:較ReaderWriterLock優化的讀寫鎖,多個線程同時訪問讀鎖 或  一個線程訪問寫鎖

private ReaderWriterLockSlim obj = new ReaderWriterLockSlim(); //在每一個容器中申明一個讀寫鎖 public ConcurrentDictionary<string, ConcurrentList<DateTime>> dic = new ConcurrentDictionary<string, ConcurrentList<DateTime>>(); //建立該容器 dic

而後當你向容器添加一條管道中的數據是經過這個方法:

public bool Add(int userId) { obj.EnterReadLock();//掛讀鎖,容許多個線程同時寫入該方法 try { ConcurrentList<DateTime> dtList = dic.GetOrAdd(userId.ToString(),t=>{ new ConcurrentList<DateTime>()}); //若是不存在就新建 ConcurrentList return dtList.CounterAdd(10, DateTime.Now); //管道容量10,當臨界管道容量後 返回false } finally { obj.ExitReadLock(); } }

 這裏,爲了在後面的線程遍歷刪除ConcurrentList的管道的時候保證ConcurrentList的安全性,因此此處要加讀鎖。

 而ConcurrentList,由於.Net沒有推出List集合類的線程安全(這裏我申明下:之因此不用ConcurrentBag是由於要保證count和add的一致性,這裏補充一下),因此本身新建了一個繼承於List<T>的安全類型,在這裏 封裝了3個須要使用的方法。

public class ConcurrentList<T> : List<T> { private object obj = new object(); public bool CounterAdd(int num, T value) { lock (obj) { if (base.Count >= num) return false; else
                base.Add(value); return true; } } public new bool Remove(T value) { lock (obj) { base.Remove(value); return true; } } public new T[] ToArray() { lock (obj) { return base.ToArray(); } } }

最後就是線程的運行方法:

public void Run() { ThreadPool.QueueUserWorkItem(c => { while (true) { if (dic.Count > 0) { foreach (var item in dic.ToArray()) { ConcurrentList<DateTime> list = item.Value; foreach (DateTime dt in list.ToArray())   
 { if (DateTime.Now.AddSeconds(-3) > dt) { list.Remove(dt); Console.WriteLine("容器" + seat + " 已刪除用戶" + item.Key + "管道中的一條數據"); } } if (list.Count == 0) { obj.EnterWriteLock(); try { if (list.Count == 0) { if (dic.TryRemove(item.Key, out ConcurrentList<DateTime> i)) { Console.WriteLine("容器" + seat + " 已清除用戶" + item.Key + "的List管道"); } } } finally { obj.ExitWriteLock(); } } } } else { Thread.Sleep(100); } } } ); }

最後,是效果圖,一個是基於控制檯的,還一個是基於Signalr的。

 

分佈式下Redis

上面介紹了一種頻率限制的模型,分佈式與單機相比,無非就是載體不一樣,咱們只要把這個容器的載體從程序上移植出來,來弄成一個單獨的服務或者直接借用Redis也是可行的。

這裏就介紹分佈式狀況下,Redis的實現。

不一樣於Asp.Net的多線程模型,大概由於Redis的各類類型的元素很是粒度的操做致使各類加鎖的複雜性,因此在網絡請求處理這塊Redis是單線程的,基於Redis的實現則由於單線程的緣故在編碼角度不用太多考慮到與邏輯無關的問題。

  簡單介紹下,Redis是一個內存數據庫,這個數據庫屬於非關係型數據庫,它的概念不一樣於通常的咱們認知的Mysql Oracle SqlServer關係型數據庫,它沒有Sql沒有字段名沒有表名這些概念,它和HttpRunTime.Cache的概念差很少同樣,首先從操做上屬於鍵值對模式,就如 Cache["鍵名"] 這樣就能獲取到值相似,並且能夠對每一個Key設置過時策略,而Redis中的Key所對應的值並非想存啥就存啥的,它支持五種數據類型:string(字符串),hash(哈希),list(列表),set(集合)及sorted set(有序集合)。

今天要說的是Sorted set有序集合,有序集合相比其它的集合類型的特殊點在於,使用有序集合的時候還能給插入的元素指定一個 積分score,咱們把這個積分score理解爲排序列,它內部會對積分進行排序,積分容許重複,而有序集合中的元素則是惟一。

  仍是一樣的思路,每當有用戶訪問的時候,都對該用戶的 管道(有序集合)中添加一個元素,而後設置該元素的積分爲當前時間。接着在程序中開個線程,來對管道中積分小於約定時間的元素進行清理。由於規定有序集合中的元素只能是惟一值,因此在賦值方面只要是知足uuid便可。

 

那麼用Redis來實現的代碼那就是相似這種:

經過using語法糖實現IDisposable而包裝的Redis分佈式鎖,而後裏面正常的邏輯判斷。

這樣的代碼雖然也能完成功能,但不夠友好。Redis是個基於內存的數據庫,於性能而言,瓶頸在於網絡 IO 上,與Get一次發出一次請求相比,能不能經過一段腳原本實現大部分邏輯呢?

有的,Redis支持 Lua腳本:
  Lua 是一種輕量小巧的腳本語言,用標準C語言編寫並以源代碼形式開放, 其設計目的是爲了嵌入應用程序中,從而爲應用程序提供靈活的擴展和定製功能。
  大體意思就是,直接向Redis發送一段腳本或者讓它直接本地讀取一段腳本從而直接實現全部的邏輯。

/// <summary>
/// 若是 大於10(AccountNum) 就返回1 不然就增長一條集合中的元素 並返回 空 /// </summary>
/// <param name="zcardKey"></param>
/// <param name="score"></param>
/// <param name="zcardValue"></param>
/// <param name="AccountNum"></param>
/// <returns></returns>
public string LuaAddAccoundSorted(string zcardKey, double score, string zcardValue, int AccountNum) { string str = "local uu = redis.call('zcard',@zcardKey) if (uu >=tonumber(@AccountNum)) then return 1 else redis.call('zadd',@zcardKey,@score,@zcardValue) end"; var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str), new { zcardKey = zcardKey, score = score, zcardValue = zcardValue, AccountNum=AccountNum }); return re.ToString(); }

local uu就是申明一個爲名uu的變量的意思,redis.call就是redis命令,這段腳本意思就是若是 大於10(AccountNum) 就返回1   不然就增長一條集合中的元素 並返回 空。

管道內元素處理的方法就是:

/// <summary>
 /// 遍歷當前全部前綴的有序集合,若是數量爲0,那麼就返回1 不然 就刪除 知足最大分值條件區間的元素,若是該集合個數爲0則消失 /// </summary>
 /// <param name="zcardPrefix"></param>
 /// <param name="score"></param>
 /// <returns></returns>
public string LuaForeachRemove(string zcardPrefix, double score) { StringBuilder str = new StringBuilder(); str.Append("local uu = redis.call('keys',@zcardPrefix) "); //聲明一個變量 去獲取 模糊查詢的結果集合 str.Append("if(#uu==0) then"); //若是集合長度=0 str.Append(" return 1 "); str.Append("else "); str.Append(" for i=1,#uu do "); //遍歷 str.Append(" redis.call('ZREMRANGEBYSCORE',uu[i],0,@score) "); //刪除從0 到 該score 積分區間的元素 str.Append(" if(redis.call('zcard',uu[i])==0) then "); //若是管道長度=0 str.Append(" redis.call('del',uu[i]) "); //刪除 str.Append(" end "); str.Append(" end "); str.Append("end "); var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str.ToString()), new { zcardPrefix = zcardPrefix + "*", score = score }); return re.ToString();

這2段代碼經過發送Lua腳本的形式來完成了整個過程,由於Redis的網絡模型緣由,因此把LuaForeachRemove方法給提出來作個服務來單獨處理便可。至於那種多容器多線程的實現,則徹底能夠開多個Redis的實例來實現。最後放上效果圖。

最後,我把這些都給作成了個Demo。可是沒有找到合適的上傳網盤,因此你們能夠留郵箱(留了就發),或者直接加QQ羣文件自取,討論交流:166843154

 

我喜歡和我同樣的人交朋友,不被環境影響,本身是本身的老師,歡迎加羣 .Net web交流羣, QQ羣:166843154 慾望與掙扎

 

做者:小曾出處:http://www.cnblogs.com/1996V/p/8127576.html 歡迎轉載,但任何轉載必須保留完整文章及博客園出處,在顯要地方顯示署名以及原文連接。.Net交流羣, QQ羣:166843154 慾望與掙扎 
相關文章
相關標籤/搜索