[超簡潔]EasyQ框架-應對WEB高併發業務(秒殺、抽獎)等業務

背景介紹git

    這幾年一直在摸索一種框架,足夠簡單,又能應付不少高併發高性能的需求。研究過一些框架思想如DDD DCI,也實踐過CQRS框架。github

可是總以爲複雜度高,門檻也高,本身學都吃力,若是團隊新人更難接受。因此自從寫了最簡單的BaseContext類以後很長一段時間內都沒有加任何代碼。(basecontext只有10行內代碼)數據庫

以前有個秒殺業務要作,用了MVC的異步Action隊列處理請求,感受仍是蠻不錯,因此跟另一位同事一同把這個功能整合進這個baseContext裏面,既沒有用第三方的Queue(如 RabbitMQ )也沒有另外開一個宿主進程Exe。api

總之「simple is good!」安全

EasyQ服務器

EasyQ是一個輕量級的專門用來處理高併發HTTP請求的框架。
應用MVC異步Action機制實現了相同業務單線程排隊處理,沒有用任何讀寫鎖。
能夠指定某一種業務建立一條隊列,也能夠指定某一種業務+某一種數據ID做爲一條隊列。
如秒殺商品業務,能夠指定全部秒殺業務都使用單線程排隊處理,避免髒讀 髒寫。
可是這樣作的話,全部秒殺商品都會進入排隊,顯然是不科學的。
因此擴展一種方式是: 秒殺業務+商品ID 做爲隊列名。
固然不止商品ID,也能夠是用戶ID,商品分類等任意字符串做爲隊列名的後綴。
  GITHUB地址:https://github.com/BTteam/EasyQ多線程

如能佔用您一點時間,提出一點改進的意見,不勝感激!併發


使用說明框架

HomeController 是入口頁面,須要繼承AsyncController,使用MVC的異步Action
BT.Contexts項目放置業務代碼,全部Context須要繼承抽象類QueueBaseContext,而且實現3個方法
1,InitData 初始化數據,數據庫獲取數據的方法應該寫在此處
2,Interact 交互操做,數據模型之間的交互,業務代碼的各類計算、判斷等
3,Persist 持久化操做,數據保存到數據庫的操做應當寫在此處。
這3個方法的默認執行步驟很是簡單 1=》2=》3
  這個類是封裝了隊列、線程的操做,是EasyQ的核心類。
在HomeController使用Context時,首先應該分開2個Action 如 TestAsync TestCompleted。這是MVC異步Action的機制決定
TestAsync用來啓動異步,TestCompleted是異步完成後的回調操做。這2個方法必須成對出現。具體原理請參考MSDN異步

調用是URL爲:{host}/home/text 注意Async後綴在路由時會被去掉。
SetAsync方法必須傳入AsyncManager對象,key是可選參數,如上所述是用來細分隊列的。
若是想根據商品ID生成隊列,不一樣商品的秒殺行爲在不一樣的隊列中排隊,就在此處用SetAsync傳入key是商品ID

public void TestAsync(string key)
{
//GET DATA
TestContext context = new TestContext(1);
context.SetAsync(AsyncManager, key);//參數爲產品隊列標識
context.Execute();
}

 

再看回調方法

public ActionResult TestCompleted()
{
var result = AsyncManager.Parameters["response"];

return Content(JsonConvert.SerializeObject( result));
}

 全部Context執行後的結果以Parameters["response"]返回

 

核心解析

QueueBaseContext類

public abstract class  QueueBaseContext:BaseContext
    {
       private ILog log = LogManager.GetLogger(typeof(QueueBaseContext));
       private static ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>> killQueues = new ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>>();
        private static ConcurrentDictionary<string, Task> taskDic = new ConcurrentDictionary<string, Task>();
       //場景使用步驟  編寫好 1.Interact() 2.Persist() 3.在api調用初始場景後,調用QueueContextAsync()
        private AsyncManager AsyncManager;
        private string quenekey;

       public void SetAsync(AsyncManager _AsyncManager)
       {
           SetAsync(_AsyncManager, "");
       }
       public void SetAsync(AsyncManager _AsyncManager, string _quenekey)
       {
           quenekey = _quenekey;
           this.AsyncManager = _AsyncManager;
       }

       public abstract void InitData();
       public override string Execute()
       {
           if (AsyncManager == null)
           {
               throw new Exception("必須調用SetAsync 設置AsyncManager對象");

           }
         var runtimeType=  this.GetType();
           var qKey = runtimeType.FullName + quenekey;
          // typeof().
           AsyncManager.OutstandingOperations.Increment();
           //開一個隊列 判斷是否有隊列 
           if (killQueues.ContainsKey(qKey) == false)
           {
               killQueues.TryAdd(qKey, new ConcurrentQueue<AsyncManager>(new[] {AsyncManager}));
           }
           else
           {
               killQueues[qKey].Enqueue(AsyncManager);

           }
           Action ac = () =>
           {
              
               while (killQueues[qKey].IsEmpty == false)
               {
                 //  Thread.Sleep(15000);
                   log.DebugFormat("while 進來了  killQueueitemCount length:{0} ,Q num{1}", killQueues[qKey].Count, killQueues.Count);
                   AsyncManager item;
                   killQueues[qKey].TryDequeue(out item);//取出隊列的一個進行處理
                   try
                   {

                       InitData();
                       if (Interact())//對應業務邏輯
                           Persist();

                       AsyncManager.Parameters["response"] = new { Code = this.StatusCode};
                       AsyncManager.OutstandingOperations.Decrement();
                   }
                   catch (Exception e)
                   {
                       log.ErrorFormat("出錯,e msg:{0} ,trace:{1}", e.Message, e.StackTrace);
                       AsyncManager.Parameters["response"] = new { Code = ResponseCode.DataError, Description = "服務器錯誤,請重試" };
                       AsyncManager.OutstandingOperations.Decrement();
                   }
               }
               //remove q
           };
           if (taskDic.ContainsKey(qKey) == false)
           {
               taskDic.TryAdd(qKey, Task.Factory.StartNew(ac));
           }
           if (taskDic[qKey].IsCompleted || taskDic[qKey].IsFaulted)
           {
               taskDic[qKey] = Task.Factory.StartNew(ac);
           }
           return "";
       }
     

    }

構建了2個字典 

killQueues :隊列名做爲KEY 隊列實例做爲Value
taskDic:隊列名做爲KEY 隊列指定的執行Task做爲Value

處理邏輯是建立一個Task 循環隊列每次取出一個項,執行業務操做。直到隊列爲空。
若是在上一個Task運行過程當中有新的請求加入,則不須要新建Task,只須要繼續加入隊列尾部。上一個Task會執行對應的業務操做。
隊列中的每個元素表明一次業務操做,操做完畢以後會調用
AsyncManager.OutstandingOperations.Decrement();
用來返回異步結果給請求線程。這樣HTTP請求結果就返回給用戶了。不須要等到隊列完畢。
qKey能夠設置任意字符串,用來細分隊列名稱。

隊列最好用
ConcurrentQueue
由於在入列出列操做時處於多線程共享隊列,必需要用線程安全的隊列類。


存在缺陷

1 目前是單點設計,只能在單機上運行,還在研究橫向擴展。 2 性能還須要優化 3 因爲使用異步Action 致使每一個Action必須一分爲二。

相關文章
相關標籤/搜索