[.NET領域驅動設計實戰系列]專題八:DDD案例:網上書店分佈式消息隊列和分佈式緩存的實現

原文: [.NET領域驅動設計實戰系列]專題八:DDD案例:網上書店分佈式消息隊列和分佈式緩存的實現

1、引言

   在上一專題中,商家發貨和用戶確認收貨功能引入了消息隊列來實現的,引入消息隊列的好處能夠保證消息的順序處理,而且具備良好的可擴展性。可是上一專題消息隊列是基於內存中隊列對象來實現,這樣實現有一個弊端,就是一旦服務重啓或出現故障時,此時消息隊列中的消息會丟失,而且也記錄不了日誌。因此就會出現,商家發貨成功後,用戶並無收到郵件通知,而且也沒有日誌讓咱們發現是否發送了郵件通知。爲了解決這個問題,就須要引入一種可恢復的消息隊列。目前有不少開源的消息隊列都支持可恢復的,例如TibcoEms.net等。然而,微軟的MSMQ也是支持這種特性的。而且MSMQ還支持分佈式部署,關於MSMQ更多內容能夠參考:http://www.cnblogs.com/zhili/p/MSMQ.htmlhtml

   在本專題中將介紹爲網上書店案例引入分佈式消息隊列和分佈式緩存的實現。git

2、分佈式消息隊列的實現

   MSMQ的實現原理是:消息的發送者把本身想要發送的信息放入一個容器,而後把它保存到一個系統公用空間的消息隊列中,本地或異地的消息接收程序再從該隊列中取出發給它的消息進行處理。因此,即便服務器忽然重啓,消息也會存在於系統公用空間的消息隊列中,待服務器從新啓動後,能夠繼續接受消息進行處理,從而解決上一專題存在的問題。另外,上一專題的消息隊列只能被用在當前服務器中,而MSMQ支持分佈式部署,不一樣機器均可以對MSMQ進行接收消息來處理,此時MSMQ起到一箇中間件的做用。github

  在爲網上書店引入分佈式消息隊列以前,讓咱們先理一下實現思路:算法

  • 上一專題中把發貨事件和收貨事件發佈到EventBus中,而此時須要用MsmqEventBus來替代EventBus。而MsmqEventBus的實現就很簡單了,徹底能夠參考EventBus來實現,只是此時消息並非進入Queue對象中,而是經過MessageQueue對象發送到系統的消息隊列中。
  • 而Commit方法即從系統的消息隊列中出隊來得到消息。再得到消息的處理器時,與上一專題的實現有點不一樣,由於把事件對象發送到消息隊列時,須要先把事件對象先序列化爲Message對象再放入消息隊列中,而出隊的也是消息對象,而不是上一專題中的發貨事件對象。因此此時須要把出隊的消息對象反序列化爲對應的事件對象。

   有了上面的實現思路,接下來讓咱們一塊兒看看MsmqEventBus的具體實現代碼吧。數據庫

public class MsmqEventBus : DisposableObject, IEventBus
    {
             public void Publish<TMessage>(TMessage message) where TMessage : class, IEvent
        {
            // 將消息放入Message中Body屬性進行序列化發送到消息隊列中
            var msmqMessage = new Message(message) { Formatter = new XmlMessageFormatter(new[] { message.GetType() }), Label = message.GetType().ToString()};
            _messageQueue.Send(msmqMessage);
            _committed = false;
        }

        public void Publish<TMessage>(IEnumerable<TMessage> messages) where TMessage : class, IEvent
        {
            messages.ToList().ForEach(m =>
            {
                _messageQueue.Send(m);
                _committed = false;
            });
        }

        public void Commit()
        {
            if (this._useInternalTransaction)
            {
                using (var transaction = new MessageQueueTransaction())
                {
                    try
                    {
                        transaction.Begin();
                        var message = _messageQueue.Receive();
                        if (message != null)
                        {
                            message.Formatter = new XmlMessageFormatter(new[] { typeof(string) });
                            var evntType = ConvertStringToType(message.Body.ToString());
                            var method = _publishMethod.MakeGenericMethod(evntType);
                            var evnt = Activator.CreateInstance(evntType);
                            method.Invoke(_aggregator, new object[] { evnt });

                            transaction.Commit();
                        }
                    }
                    catch
                    {
                        transaction.Abort();
                        throw;
                    }
                }
            }
            else
            {
                // 從msmq消息隊列中出隊,此時得到的對象是消息對象
                var message = _messageQueue.Receive();
                if (message != null)
                {
                    // 指定反序列化的對象,因爲咱們以前把對應的事件類型保存在MessageQueue中的Label屬性
                    // 因此此時能夠經過Label屬性來得到目標序列化類型
                    message.Formatter = new XmlMessageFormatter(new[] { ConvertStringToType(message.Label) });
                    
                    // 這樣message.Body得到就是對應的事件對象,後面的處理邏輯就和EventBus同樣了
                    var evntType =message.Body.GetType();
                    var method = _publishMethod.MakeGenericMethod(evntType);
                    method.Invoke(_aggregator, new object[] { message.Body });
                }
            }

            _committed = true;
        }
    }

  結合上面代碼的註釋和前面實現思路的介紹,相信理解MsmqEventBus應該沒什麼問題了。接下來,咱們須要在配置文件中指定EventBus爲MsmqEventBus類,另外須要在你本地專有隊列中建立"OnlineStoreQueue"隊列來接受消息。具體的配置文件修改成:編程

 <!--Event Bus-->
      <!--<register type="OnlineStore.Events.Bus.IEventBus, OnlineStore.Events" mapTo="OnlineStore.Events.Bus.EventBus, OnlineStore.Events">
        <lifetime type="singleton" />
      </register>-->
      
      <!--注入MsmqEventBus-->
      <register type="OnlineStore.Events.Bus.IEventBus, OnlineStore.Events"
                mapTo="OnlineStore.Events.Bus.MsmqEventBus, OnlineStore.Events">
        <lifetime type="singleton" />
        <constructor>
          <param name="path" value=".\Private$\OnlineStoreQueue" />
        </constructor>
      </register>
    </container>

  到此,分佈式消息隊列的實現就完成了,具體分佈式消息隊列的實現效果和上一專題使用EventBus的效果是同樣的,這裏就再也不貼圖了,你們能夠自行下載源碼查看。緩存

3、緩存的實現

  在實際開發過程當中,緩存的實現是必不可少的,對於已經查詢過的數據能夠直接從緩存中進行讀取返回給調用者,利用緩存不但能夠加快響應速度,還能減輕數據庫服務器的壓力。在大型電子商務網站中,緩存的實現更是必不可少的功能。然而緩存的實現也有兩種,一種是分佈式緩存,另外一種本地緩存。在大型網站中,更多實現的是分佈式緩存,對於一些少用戶的企業系統,可能纔會使用到本地緩存。因此在本專題中,將在網上書店案例中對這兩種緩存分別進行實現。服務器

3.1 本地緩存的實現

  首先,咱們來介紹本地緩存的實現。因爲這裏須要實現兩種緩存,根據面向接口編程原則,咱們天然首先須要定義一個緩存接口,而後這兩種具體緩存都須要實現該接口。針對緩存接口,無非是緩存數據的添加,移除,更新等操做,因此緩存接口的定義以下所示:分佈式

 // 緩存接口的定義
    public interface ICacheProvider
    {
        /// <summary>
        /// 向緩存中添加一個對象
        /// </summary>
        /// <param name="key">緩存的鍵值</param>
        /// <param name="valueKey">緩存值的鍵值</param>
        /// <param name="value">緩存的對象</param>
        void Add(string key, string valueKey, object value);
        void Update(string key, string valueKey, object value);
        object Get(string key, string valueKey);
        void Remove(string key);
        bool Exists(string key);
        bool Exists(string key, string valueKey);
    }

  在介紹本地緩存的實現以前,讓咱們先來思考下本地緩存的實現思路——就是在本地緩存類中定義一個字典對象,添加緩存就是往該字典插入鍵值對而已,其中key就是緩存數據對應的鍵值,value就是真正的緩存數據,若是緩存在字典中存在的話,就直接根據鍵值查找出緩存數據進行返回。ide

  然而網上書店的本地緩存是基於Enterprise Library Caching庫來實現的,其實現思路和我以前介紹的思路也是同樣的,只不過此時字典對象不須要咱們在類中定義,此時直接用Enterprise Library Caching庫中定義的就好。有了上面的分析,本地緩存的實現理解起來也就不那麼難了,具體本地緩存的實現代碼以下所示:

// 表示基於Microsoft Patterns & Practices - Enterprise Library Caching Application Block的緩存機制的實現
    // 該類簡單理解爲對Enterprise Library Caching中的CacheManager封裝
    // 該緩存實現不支持分佈式緩存,更多信息參考: 
    // http://stackoverflow.com/questions/7799664/enterpriselibrary-caching-in-load-balance 
    public class EntLibCacheProvider : ICacheProvider
    {
        // 得到CacheManager實例,該實例的註冊經過cachingConfiguration進行註冊進去的,具體看配置文件
        private readonly ICacheManager _cacheManager = CacheFactory.GetCacheManager();

        #region ICahceProvider

        public void Add(string key, string valueKey, object value)
        {
            Dictionary<string, object> dict = null;
            if (_cacheManager.Contains(key))
            {
                dict = (Dictionary<string, object>) _cacheManager[key];
                dict[valueKey] = value;
            }
            else
            {
                dict = new Dictionary<string, object> { { valueKey, value }};
            }

            _cacheManager.Add(key, dict);
        }

        public void Update(string key, string valueKey, object value)
        {
            Add(key, valueKey, value);
        }

        public object Get(string key, string valueKey)
        {
            if (!_cacheManager.Contains(key)) return null;
            var dict = (Dictionary<string, object>)_cacheManager[key];
            if (dict != null && dict.ContainsKey(valueKey))
                return dict[valueKey];
            else
                return null;
        }

        // 從緩存中移除對象
        public void Remove(string key)
        {
            _cacheManager.Remove(key);
        }

        // 判斷指定的鍵值的緩存是否存在
        public bool Exists(string key)
        {
            return _cacheManager.Contains(key);
        }

        // 判斷指定的鍵值和緩存鍵值的緩存是否存在
        public bool Exists(string key, string valueKey)
        {
            return _cacheManager.Contains(key) &&
               ((Dictionary<string, object>)_cacheManager[key]).ContainsKey(valueKey);
        }
        #endregion 
    }

  到此,網上書店案例中本地緩存的實現就完成了。因爲本地緩存不支持分佈式部署,全部的緩存都存在於單獨緩存服務器中,然而,針對一些大型網站來講,這樣的實現並不適合,由於在大型網站中,須要經過多個緩存服務進行集羣,須要使得緩存均勻分佈在集羣中的緩存服務器中。此時就須要引入分佈式緩存的實現。下面讓咱們具體看看分佈式緩存如何在該案例中實現。

3.2 分佈式緩存的實現

  分佈式緩存能夠經過具體的算法把緩存均勻地分佈在集羣中緩存服務器中,從而用戶請求的不一樣數據能夠路由到對應的緩存服務器中進行添加、更新或得到。分佈式緩存的實現有不少種方式,能夠利用Memcached和Redis開源庫來實現。然而,微軟的Windows Azure也提供了分佈式緩存的實現,本案例中分佈式緩存就是基於Windows Azure的。在對分佈式緩存實現以前,須要先下載對應的dll,而後再在項目中進行引用。須要下載的dll已經包含在項目根目錄下的libs文件夾下,具體須要下載的程序集截圖以下所示:

 

  而後在基礎設施層引入這些程序集,以前就能夠去實現基於Windows Azure的分佈式緩存了。具體的實現代碼以下所示:

// 分佈式緩存,該類是對微軟分佈式緩存服務的封裝
    // 在該案例中沒用用到該緩存,可是提供在這裏讓你們明白微軟的分佈式緩存實現,並非只有memcached和Redis
    // Redis參考:http://www.cnblogs.com/ceecy/p/3279407.htmlhttp://blog.csdn.net/suifeng3051/article/details/23739295
    // 關於微軟分佈式緩存更多介紹參考:http://www.cnblogs.com/shanyou/archive/2010/06/29/AppFabricCaching.html 
    // 和http://www.cnblogs.com/mlj322/archive/2010/04/05/1704624.html
    public class AppfabricCacheProvider : ICacheProvider
    {
        private readonly DataCacheFactory _factory = new DataCacheFactory();
        private readonly DataCache _cache;

        public AppfabricCacheProvider()
        {
            this._cache = _factory.GetDefaultCache();
        }

        #region ICacheProvider Members
        public void Add(string key, string valueKey, object value)
        {
            // DataCache中不包含Contain方法,全部用Get方法來判斷對應的key值是否在緩存中存在
            var val = (Dictionary<string, object>)_cache.Get(key);
            if (val == null)
            {
                val = new Dictionary<string, object> {{ valueKey, value}};
                _cache.Add(key, val);
            }
            else
            {
                if (!val.ContainsKey(valueKey))
                    val.Add(valueKey, value);
                else
                    val[valueKey] = value;

                _cache.Put(key, val);
            }
        }

        public void Update(string key, string valueKey, object value)
        {
            Add(key, valueKey, value);
        }

        public object Get(string key, string valueKey)
        {
            return Exists(key, valueKey) ? ((Dictionary<string, object>)_cache.Get(key))[valueKey] : null;
        }

        public void Remove(string key)
        {
            _cache.Remove(key);
        }

        public bool Exists(string key)
        {
            return _cache.Get(key) != null;
        }

        public bool Exists(string key, string valueKey)
        {
            var val = _cache.Get(key);
            if (val == null)
                return false;
            return ((Dictionary<string, object>)val).ContainsKey(valueKey);
        }

        #endregion 
    }

  經過上面的步驟,分佈式緩存的實現就完成了。其實,分佈式緩存和本地緩存不一樣之處就在於:分佈式緩存支持對應的算法能夠把緩存存放在不一樣的服務器上,而本地緩存只能存在於本地,而不能跨機器分佈。因此對於大型網站,分佈式緩存纔是最好的選擇,因爲分佈式緩存的實現和部署,無疑會增長開發和維護成本,因此對於一些小型系統(指定是單數據庫服務器系統),能夠考慮使用本地緩存。

  在本案例中,因爲本人沒有Windows Azure環境,因此對於分佈式緩存的實現也不能進行測試,因此本案例中使用的仍是本地緩存。要使緩存生效,還須要對配置文件進行修改。具體配置文件修改成:

 <unity xmlns="http://schemas.microsoft.com/practices/2010/unity">
    <sectionExtension type="Microsoft.Practices.Unity.InterceptionExtension.Configuration.InterceptionConfigurationExtension, Microsoft.Practices.Unity.Interception.Configuration" />
    <container>
      <extension type="Interception" />
      
      <!--Cache Provider--> <register type="OnlineStore.Infrastructure.Caching.ICacheProvider, OnlineStore.Infrastructure" mapTo="OnlineStore.Infrastructure.Caching.EntLibCacheProvider, OnlineStore.Infrastructure" />
<!--........-->
 </container>
</unity>

  其實,經過上面的配置以後,緩存仍是不能生效的,由於咱們通常把緩存放在得到數據方法以前進行調用,在用戶對得到數據方法調用以前,首先從緩存中進行查找,若是存在,則直接返回緩存中的數據給調用者就能夠了,若是不存在再調用得到數據方法從數據庫中讀取,讀取成功後添加到緩存中再返回給調用者。既然要在方法調用前來查找緩存,從中你是否想到了什麼呢?不錯,就是面向切面編程,即AOP。因此要讓緩存生效,在該案例中還須要支持AOP。至於AOP的支持,我將會在下一專題進行介紹。

4、總結

   到這裏,本專題的內容就結束了,正如前面所說的,在下一專題,我將在網上書店案例中引入對AOP的支持。

  本專題全部源碼下載地址:https://github.com/lizhi5753186/OnlineStore_Second/

相關文章
相關標籤/搜索