NoSQL初探之人人都愛Redis:(3)使用Redis做爲消息隊列服務場景應用案例

1、消息隊列場景簡介

  「消息」是在兩臺計算機間傳送的數據單位。消息能夠很是簡單,例如只包含文本字符串;也能夠更復雜,可能包含嵌入對象。消息被髮送到隊列中,「消息隊列」是在消息的傳輸過程當中保存消息的容器javascript

  在目前普遍的Web應用中,都會出現一種場景:在某一個時刻,網站會迎來一個用戶請求的高峯期(好比:淘寶的雙十一購物狂歡節,12306的春運搶票節等),通常的設計中,用戶的請求都會被直接寫入數據庫或文件中,在高併發的情形下會對數據庫服務器或文件服務器形成巨大的壓力,同時呢,也使響應延遲加重。這也說明了,爲何咱們當時那麼地抱怨和吐槽這些網站的響應速度了。當時2011年的京東圖書促銷,曾一直出如今購物車中點擊「購買」按鈕後一直是「Service is too busy」,其實就是由於當時的併發訪問量過大,超過了系統的最大負載能力。固然,後邊,劉強東臨時購買了很多服務器進行擴展以求加強處理併發請求的能力,還請了信息部的人員「喝茶」,如今京東已是超大型的網上商城了,我也有同窗在京東成都研究院工做了。css

使用消息隊列

  從京東當年的「Service is too busy」不難看出,高併發的用戶請求是網站成長過程當中必不可少的過程,也是一個必需要解決的難題。在衆多的實踐當中,除了增長服務器數量配置服務器集羣實現伸縮性架構設計以外,異步操做也被普遍採用。而異步操做中最核心的就是使用消息隊列,經過消息隊列,將短期高併發產生的事務消息存儲在消息隊列中,從而削平高峯期的併發事務,改善網站系統的性能。在京東之類的電子商務網站促銷活動中,合理地使用消息隊列,能夠有效地抵禦促銷活動剛開始就開始大量涌入的訂單對系統形成的衝擊html

  記得我在實習期間,成都市XXXX局的一個價格信息採集發佈系統項目中有一個採集任務發佈的模塊,其中每一個任務都是一個事務,這個事務中須要向數據庫中不斷地插入行,每一個任務發佈時都要往表中插入幾百行甚至幾千行的任務數據(好比價格採集日報,每每須要發佈2-3年的任務數據,每一天都是一個任務,因此大約有2,3千行任務期號數據,還要發給不少個區縣的監測中心,所以數據庫寫操做量很大,更別說同時發佈的併發操做),因爲業務邏輯的處理比較複雜和往數據庫的寫操做量交大,因此在沒有采用消息隊列時點擊「發佈」按鈕後每每須要等待1分鐘左右的時間才提示「發佈成功」,用戶體驗極不友好。java

  這時,咱們就可使用消息隊列的思想來重構這個發佈模塊,在用戶點擊「發佈」按鈕後,系統只須要把往數據庫插入的這個事務信息插入到指定的任務發佈消息隊列裏邊去(入隊操做,這裏通常有一臺獨立的消息隊列服務器來單獨存儲和處理),而後系統就能夠當即對用戶的這個發佈請求進行響應(好比給出一個發佈成功的操做提示,這裏暫不考慮消息隊列服務操做失敗的情形,若是失敗了,能夠考慮採用給用戶發送郵件、短信或站內消息,讓其從新進行發佈操做)。redis

隊列結構

  最後,消息隊列服務器中有一個進程單獨對消息隊列進行處理,首先判斷消息隊列中是否有待處理的消息,若是有,則將其取出(出隊操做,堅持「先進先出」的順序,保證事務的準確性)進行相應地處理(好比這裏是進行保存數據的操做,將數據插入到數據庫服務器中的指定數據庫裏邊,實質仍是文件的IO操做)。就這樣,經過消息隊列將高併發用戶請求進行異步操做,而後一一對消息隊列進行出隊的同步操做,也避免了併發控制的難題。sql

  說到這裏,你們可能會想到這尼瑪不就是生產者消費者模式麼?對的,麼麼嗒,消息隊列就是生產者消費者模式的典型場景。簡單地說,客戶端不一樣用戶發送的操做請求就是生產者,他們將要處理的事務存儲到消息隊列中,而後消息隊列服務器的某個進程不停地將要處理的單個事務從消息隊列中一個一個地取出來進行相應地處理,這就是消費者消費的過程。數據庫

  下面咱們將以異常日誌爲案例,介紹在.Net中如何採用消息隊列的思想解決併發問題。固然,消息隊列只是解決併發問題的其中一種方式,在實際中每每須要結合多種不一樣的技術方式來共同解決,好比負載均衡、反向代理、集羣等方案。這裏,雖然以異常日誌爲案例,可是「麻雀雖小五臟俱全」,日誌寫入文件的高併發操做也一樣適用於數據庫的高併發,因此,研究這個案例是具備實際意義的。數組

2、使用預置類型實現異常日誌隊列

  在平常的Web應用中,異常日誌的記錄是一個十分重要的要點。由於,人無完人,系統也同樣,不免會在何時出一個測試階段未能徹底測試到的異常。這時候,不能將異常信息直接顯示給客戶,那樣既不友好也不安全。因此,通常都採用將異常信息記錄到日誌文件中(好比某個txt文件,數據庫中某個表等),而後技術支持人員經過查看異常日誌,分析異常緣由,改進BUG從新發布,保障系統正常運行。安全

  在用戶的各類操做中,若是出現異常的時間一致,那麼記錄異常日誌的操做就會成爲併發操做,而記錄異常日誌又屬於文件的IO操做(其實數據庫的讀寫歸根結底也是對文件即對磁盤進行的IO操做),所以頗有可能帶來併發控制的一系列問題。在以往的編碼實踐中,咱們能夠經過給不一樣的IO請求進行加鎖(C#中的lock),等第一個請求完成寫入後釋放鎖,第二個請求再得到鎖,進行IO操做,而後釋放掉,一直到第N個請求釋放後結束。這種方式,雖然解決了併發操做帶來的問題,可是經過加鎖延遲了用戶響應請求的時間(好比第一個正在IO寫入操做時,後面的均處於等待狀態),而且加鎖也會給服務器帶來必定的性能負擔,形成服務器性能的降低。服務器

  基於以上緣由,咱們採用消息隊列的思想將異常日誌的記錄操做改成隊列版,這裏咱們先不採用Redis,直接使用.Net爲咱們提供的預置類型-Queue。接下來,就讓咱們動手開刀,寫起來。

  (1)新建一個ASP.NET MVC 4項目,選擇「基本」類型,視圖引擎選擇「Razor」。

  (2)既然是異常日誌記錄,首先得有異常。這時,咱們腦海中想到了那個經典的異常:DividedByZeroException。因而,在Controllers文件夾中新建一個Controller,取名爲Home(這裏由於Global文件中的默認路由就指向了Home控制器中的Index這個Action),在HomeController中修改Index這個Action的代碼以下:

複製代碼
        public ActionResult Index()
        {
            int a = 10;
            int b = 0;
            int c = a / b; //會拋一個DividedByZero的異常

            return View();
        }
複製代碼

  (3)在ASP.NET MVC項目中,咱們須要在Global.asax中的Application_Start這個事件中修改全局過濾器(主要是App_Start中的FilterConfig類的RegisterGlobalFilters這個方法),讓系統支持對異常的全局處理操做(咱們這裏主要是對異常進行記錄到指定文件中)。PS:Application_Start是整個Web應用的起始事件,主要進行一些配置(如過濾器配置、日誌器配置、路由配置等等)的初始化操做,固然這些配置也只會進行一次。

複製代碼
    public class FilterConfig
    {
        public static void RegisterGlobalFilters(GlobalFilterCollection filters)
        {
            // MyExceptionFilterAttribute繼承自HandleError,主要做用是將異常信息寫入日誌文件中
            filters.Add(new MyExceptionFilterAttribute());
            // 默認的異常記錄類
            filters.Add(new HandleErrorAttribute());
        }
    }
複製代碼

  經過改寫過濾器配置,咱們向全局過濾器中註冊了一個異常處理的過濾器配置,那麼這個MyExceptionFilterAttribute類又是如何編寫的呢?

複製代碼
    public class MyExceptionFilterAttribute : HandleErrorAttribute
    {
        //版本1:使用預置隊列類型存儲異常對象
        public static Queue<Exception> ExceptionQueue = new Queue<Exception>();

        public override void OnException(ExceptionContext filterContext)
        {
            //將異常信息入隊
            ExceptionQueue.Enqueue(filterContext.Exception);
            //跳轉到自定義錯誤頁
            filterContext.HttpContext.Response.Redirect("~/Common/CommonError.html");

            base.OnException(filterContext);
        }
    }
複製代碼

  經過使該類繼承HandlerErrorAttribute並使其覆寫OnException這個事件,表明在異常發生時能夠進行的操做。而咱們在這兒主要經過一個異常隊列將獲取的異常寫入隊列,而後跳轉到自定義錯誤頁:~/Common/CommonError.html,這個錯誤頁很簡單,就是簡單的顯示「系統發生錯誤,5秒後自動跳轉到首頁」

複製代碼
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <meta name="viewport" content="width=device-width" />
    <title>錯誤</title>
    <style type="text/css">
        .timecss
        {
            color: red;
            font-weight: bold;
        }
    </style>
    <script type="text/javascript">
        function delayJump(url) {
            var timeValue = parseInt(document.getElementById("time").innerHTML);
            if (timeValue > 0) {
                timeValue--;
                document.getElementById("time").innerHTML = timeValue;
            }
            else {
                window.location.href = url;
            }
            setTimeout("delayJump('" + url + "')", 1000);
        }
    </script>
</head>
<body>
    <h2>抱歉,處理您的請求時出錯。將會在<span id="time" class="timecss">5</span>秒後自動跳轉到首頁,請耐心等候。
    </h2>
</body>
<script type="text/javascript">
    var destUrl = "/Home/NoError";
    delayJump(destUrl);
</script>
</html>
複製代碼

  (4)走到這裏,生產者消費者模式中生產者的任務已經完成了,接下來消費者就須要開始消費了。也就是說,消息隊列已經建好了,咱們何時從隊列中去任務,在哪裏執行?怎麼樣執行?經過上面的介紹,咱們知道,在專門的消息隊列服務器中有一個進程在始終不停地監視消息隊列,若是有須要待辦的任務信息,則會當即從隊列中取出來執行相應的操做,直到隊列爲空爲止。因而,思路有了,咱們立刻來實現如下。這個消息監視的操做也是一個全局操做,在系統啓動時就會一直運行,因而它也應該寫在Application_Start這個全局起始事件裏邊,因而按照標準的配置寫法,咱們在Application_Start中添加了以下代碼:MessageQueueConfig.RegisterExceptionLogQueue();

複製代碼
        protected void Application_Start()
        {
            AreaRegistration.RegisterAllAreas();

            WebApiConfig.Register(GlobalConfiguration.Configuration);
            FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
            RouteConfig.RegisterRoutes(RouteTable.Routes);
            BundleConfig.RegisterBundles(BundleTable.Bundles);

            //自定義事件註冊
            MessageQueueConfig.RegisterExceptionLogQueue();
        }
複製代碼

  那麼,這個MessageQueueConfig.RegisterExceptionLogQueue()又是怎麼寫的呢?

複製代碼
  public class MessageQueueConfig
    {
        public static void RegisterExceptionLogQueue()
        {
            string logFilePath = HttpContext.Current.Server.MapPath("/App_Data/");
            //經過線程池開啓線程,不停地從隊列中獲取異常信息並將其寫入日誌文件
            ThreadPool.QueueUserWorkItem(o =>
            {
                while (true)
                {
                    try
                    {
                        if (MyExceptionFilterAttribute.ExceptionQueue.Count > 0)
                        {
                            Exception ex = MyExceptionFilterAttribute.ExceptionQueue.Dequeue(); //從隊列中出隊,獲取異常對象
                            if (ex != null)
                            {
                                //構建完整的日誌文件名
                                string logFileName = logFilePath + DateTime.Now.ToString("yyyy-MM-dd") + ".txt";
                                //得到異常堆棧信息
                                string exceptionMsg = ex.ToString();
                                //將異常信息寫入日誌文件中
                                File.AppendAllText(logFileName, exceptionMsg, Encoding.Default);
                            }
                        }
                        else
                        {
                            Thread.Sleep(1000); //爲避免CPU空轉,在隊列爲空時休息1秒
                        }
                    }
                    catch (Exception ex)
                    {
                        MyExceptionFilterAttribute.ExceptionQueue.Enqueue(ex);
                    }
                }
            }, logFilePath);
        }
    }
複製代碼

  如今,讓咱們來看看這段代碼:

  ①首先定義Log文件存放的文件夾目錄,這裏咱們通常放到App_Data裏邊,由於放到這裏邊外網是沒法訪問到的,能夠防止下載操做;

  ②其次經過線程池ThreadPool開啓一個線程,不停地監聽消息隊列裏邊的待辦事項個數,若是個數>0,則進行出隊(FIFO,先入隊的先出隊)操做。這裏主要是取出具體的異常實例對象,並將異常的具體堆棧信息追加寫入到指定命名格式的文件中。

PS:許多應用程序建立的線程都要在休眠狀態中消耗大量時間,以等待事件發生。其餘線程可能進入休眠狀態,只被按期喚醒以輪詢更改或更新狀態信息。線程池經過爲應用程序提供一個由系統管理的輔助線程池使您能夠更爲有效地使用線程。關於線程池的更多信息請訪問:http://msdn.microsoft.com/zh-cn/library/system.threading.threadpool(v=VS.90).aspx

  ③若是該線程檢測到消息隊列中無待辦事項,則使用Thread.Sleep使線程「休息」一會,避免了CPU空轉(從理論上來講,CPU資源是很珍貴的,應該儘可能提升CPU的利用率)。

  (5)最後,咱們來看看效果如何?

  ①首先,高大上的VS捕捉到了異常-DividedByZeroException:

  ②按照咱們的全局異常處理過濾器,會將此異常記入隊列中,並返回HTTP 302重定向跳轉到自定義錯誤頁面:

  ③最後,打開App_Data文件夾,查看日誌文件:

  到這裏時,咱們已經藉助消息隊列的思想完成了一個自定義的異常日誌隊列服務。但也許有朋友會說,這個跟Redis有關係麼?異常日誌不都是用Log4Net麼?不要着急,後邊咱們就會使用Redis+Log4Net來重構這個異常日誌隊列服務,不要走開,咱們不得插播廣告哦,麼麼嗒!

3、使用Redis重構異常日誌隊列

  (1)第一步,開啓Redis的服務,這裏咱們使用命令開啓Redis服務(以前已經將Redis註冊到了Windows系統服務中了嘛,麼麼嗒):net start redis-instance,固然,也能夠經過在Windows服務列表中開啓。

  (2)第二步,在剛剛的版本1的Demo中新建一個文件夾,命名爲Lib,將ServiceStack.Redis的dll和Log4Net的dll都拷貝進去。而後,在引用中添加對Lib文件夾中全部dll的引用。

  (3)第三步,重寫MyExceptionFilterAttribute這個全局異常信息過濾器。這裏使用到了Redis的客戶端鏈接池,每次鏈接時都是從池中取,不須要每次都建立,節省了時間和資源,提升了資源利用率。對於,多臺Redis服務器組成的集羣而言,這裏須要指定多個形如 IP地址:端口號 的字符串數組。

複製代碼
    public class MyExceptionFilterAttribute : HandleErrorAttribute
    {
        //版本2:使用Redis的客戶端管理器(對象池)
        public static IRedisClientsManager redisClientManager = new PooledRedisClientManager(new string[] 
        {
            //若是是Redis集羣則配置多個{IP地址:端口號}便可
            //例如: "10.0.0.1:6379","10.0.0.2:6379","10.0.0.3:6379"
            "127.0.0.1:6379"
        });
        //從池中獲取Redis客戶端實例
        public static IRedisClient redisClient = redisClientManager.GetClient();

        public override void OnException(ExceptionContext filterContext)
        {
            //將異常信息入隊
            redisClient.EnqueueItemOnList("ExceptionLog", filterContext.Exception.ToString());
            //跳轉到自定義錯誤頁
            filterContext.HttpContext.Response.Redirect("~/Common/CommonError.html");

            base.OnException(filterContext);
        }
    }
複製代碼

  (4)第四步,首先在Web.config中加入Log4Net的詳細配置。

複製代碼
<configSections>
    <!-- For more information on Entity Framework configuration, visit http://go.microsoft.com/fwlink/?LinkID=237468 -->
    <section name="entityFramework" type="System.Data.Entity.Internal.ConfigFile.EntityFrameworkSection, EntityFramework, Version=4.4.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" requirePermission="false" />
    <!-- Log4Net配置聲明 -->
    <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler,log4net"/>
  </configSections>
  <!-- Log4Net具體配置 -->
  <log4net>
    <!-- OFF, FATAL, ERROR, WARN, INFO, DEBUG, ALL -->
    <!-- Set root logger level to ERROR and its appenders -->
    <root>
      <level value="ALL"/>
      <appender-ref ref="SysAppender"/>
    </root>
    <!-- Print only messages of level DEBUG or above in the packages -->
    <logger name="WebLogger">
      <level value="DEBUG"/>
    </logger>
    <appender name="SysAppender" type="log4net.Appender.RollingFileAppender,log4net" >
      <param name="File" value="App_Data/" />
      <param name="AppendToFile" value="true" />
      <param name="RollingStyle" value="Date" />
      <param name="DatePattern" value="&quot;Logs_&quot;yyyyMMdd&quot;.txt&quot;" />
      <param name="StaticLogFileName" value="false" />
      <layout type="log4net.Layout.PatternLayout,log4net">
        <!--<param name="ConversionPattern" value="%d [%t] %-5p %c - %m%n" />-->
        <param name="ConversionPattern" value="記錄時間:%date %n線程ID: [%thread] %n日誌級別:%-5level %n出錯類:%logger property: [%property{NDC}] - %n錯誤描述:%message%newline %n" />
        <param name="Header" value="-------------------------------------------------------header-----------------------------------------------------------&#13;&#10;" />
        <param name="Footer" value="-------------------------------------------------------footer-----------------------------------------------------------&#13;&#10;" />
      </layout>
    </appender>
    <appender name="consoleApp" type="log4net.Appender.ConsoleAppender,log4net">
      <layout type="log4net.Layout.PatternLayout,log4net">
        <param name="ConversionPattern" value="%d [%t] %-5p %c - %m%n" />
      </layout>
    </appender>
  </log4net>
複製代碼

PS:Log4Net是用來記錄日誌的一個經常使用組件(Log4J的移植版本),能夠將程序運行過程當中的信息輸出到一些地方(文件、數據庫、EventLog等)。因爲Log4Net不是本篇博文介紹的重點,因此對Log4Net不熟悉的朋友,請在博客園首頁搜索:Log4Net,瀏覽其詳細的介紹。

  其次,在App_Start文件夾中添加一個類,取名爲LogConfig,定義一個靜態方法:RegisterLog4NetConfigure,具體代碼只有一行,實現了Log4Net配置的初始化操做。

複製代碼
    public class LogConfig
    {
        public static void RegisterLog4NetConfigure()
        {
            //獲取Log4Net配置信息(配置信息定義在Web.config文件中)
            log4net.Config.XmlConfigurator.Configure();
        }
    }
複製代碼

  最後,在Global.asax中的Application_Start方法中添加一行代碼,註冊Log4Net的配置:

複製代碼
        protected void Application_Start()
        {
            AreaRegistration.RegisterAllAreas();

            WebApiConfig.Register(GlobalConfiguration.Configuration);
            FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
            RouteConfig.RegisterRoutes(RouteTable.Routes);
            BundleConfig.RegisterBundles(BundleTable.Bundles);


            //自定義事件註冊
            MessageQueueConfig.RegisterExceptionLogQueue();
            LogConfig.RegisterLog4NetConfigure();
        }
複製代碼

  (5)第五步,改寫MessageQueueConfig中的RegisterExceptionLogQueue方法。這裏就再也不須要從預置類型Queue中取任務了,而是Redis中取出任務出隊進行相應處理。這裏,咱們使用了Log4Net進行異常日誌的記錄工做。PS:注意在代碼頂部添加對log4net的引用:using log4net;

複製代碼
      public static void RegisterExceptionLogQueue()
        {
            //經過線程池開啓線程,不停地從隊列中獲取異常信息並將其寫入日誌文件
            ThreadPool.QueueUserWorkItem(o =>
            {
                while (true)
                {
                    try
                    {
                        if (MyExceptionFilterAttribute.redisClient.GetListCount("ExceptionLog") > 0)
                        {
                            //從隊列中出隊,獲取異常對象
                            string errorMsg = MyExceptionFilterAttribute.redisClient.DequeueItemFromList("ExceptionLog");
                            if (!string.IsNullOrEmpty(errorMsg))
                            {
                                //使用Log4Net寫入異常日誌
                                ILog logger = LogManager.GetLogger("Log");
                                logger.Error(errorMsg);
                            }
                        }
                        else
                        {
                            Thread.Sleep(1000); //爲避免CPU空轉,在隊列爲空時休息1秒
                        }
                    }
                    catch (Exception ex)
                    {
                        MyExceptionFilterAttribute.redisClient.EnqueueItemOnList("ExceptionLog", ex.ToString());
                    }
                }
            });
        }
複製代碼

   (6)最後一步,調試驗證是否能正常寫入App_Data文件的日誌中,發現寫入的異常日誌以下,格式好看,信息詳細,圓滿完成了咱們的目的。

4、小結

  使用消息隊列將調用異步化,能夠改善網站系統的性能:消息隊列具備很好的削峯做用,即經過異步處理,將短期高併發產生的事務消息存儲在消息隊列中,從而削平高峯期的併發事務。在電商網站的促銷活動中,合理使用消息隊列,能夠有效地抵禦促銷活動剛開始大量涌入的訂單對系統形成的衝擊。本文使用消息隊列的思想,藉助Redis+Log4Net完成了一個超簡單的異常日誌隊列的應用案例,能夠有效地解決在多線程操做中對日誌文件的併發操做帶來的一些問題。一樣地,藉助消息隊列的思想,咱們也能夠完成對數據庫的高併發的消息隊列方案。因此,麻雀雖小五臟俱全,理解好了這個案例,相信對咱們這些菜鳥碼農是有所裨益的。一樣,也請大牛們一笑而過,多多指教菜鳥們一步一步地提升,謝謝了!後邊,咱們會探索一下Redis的集羣、主從複製,以及在VMWare中創建幾臺虛擬機來構建主從結構,並使用Redis記錄網站中重要的Session會話對象,或者是電商項目中常見的商品類目信息等。可是,本人資質尚淺,而且都是一些初探性質的學習,若有錯誤和不當,還請各位園友多多指教!

參考文獻

(1)傳智播客.Net學院王承偉,數據優化技術之Redis公開課,http://bbs.itcast.cn/thread-26525-1-1.html

(2)Sanfilippo/賈隆譯,《幾點建議,讓Redis在你的系統中發揮更大做用》,http://database.51cto.com/art/201107/276333.htm

(3)NoSQLFan,《Redis做者談Redis應用場景》,http://blog.nosqlfan.com/html/2235.html

(4)善心如水,《C#中使用Log4Net記錄日誌》,http://www.cnblogs.com/wangsaiming/archive/2013/01/11/2856253.html

(5)逆心,《ServiceStack.Redis之IRedisClient》,http://www.cnblogs.com/kissdodog/p/3572084.html

(6)李智慧,《大型網站技術架構-核心原理與案例分析》,http://item.jd.com/11322972.html

附件下載

(1)版本1:使用預置類型的異常日誌隊列Demo,http://pan.baidu.com/s/1nt5G7Fj

(2)版本2:使用Redis+Log4Net的異常日誌隊列Demo,http://pan.baidu.com/s/1i3gMnnJ

 https://www.cnblogs.com/edisonchou/p/3825682.html

相關文章
相關標籤/搜索