分佈式消息總線,基於.NET Socket Tcp的發佈-訂閱框架之離線支持,附代碼下載

1、分佈式消息總線以及基於Socket的實現

     在前面的分享一個分佈式消息總線,基於.NET Socket Tcp的發佈-訂閱框架,附代碼下載一文之中給你們分享和介紹了一個極其簡單也很是容易上的基於.NET Socket Tcp 技術實現的分佈消息總線,也是一個簡單的發佈訂閱框架:html

image

    而且以案例的形式爲你們演示瞭如何使用這個分佈式消息總線架構發佈訂閱架構模式的應用程序,在獲得各位同仁的反饋的同時,你們也很是想了解訂閱者離線的狀況,即支持離線構發佈訂閱框架。git

2、離線架構

     不一樣於訂閱者、發佈者都同時在線的狀況,支持訂閱者離線,架構將有所變化,以下圖所示:github

image

     也會比原先的結構將更加複雜,其中須要處理如下兩個關鍵點:sql

     1)訂閱者的持久化存儲。mongodb

     2)訂閱者離線以後其所訂閱消息的持久存儲。數據庫

3、解決方案

     爲解決消息總線的離線支持機制,咱們在Socket 框架之中增長了一個接口ISubscribeStorager服務器

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5:  
   6: namespace EAS.Messages
   7: {   
   8:     /// <summary>
   9:     /// 消息訂閱存儲接口。
  10:     /// </summary>
  11:     public interface ISubscribeStorager
  12:     {
  13:         /// <summary>
  14:         /// 持久化訂閱。
  15:         /// </summary>
  16:         /// <param name="subscriber">訂閱者。</param>
  17:         /// <param name="topic">消息主題。</param>
  18:         void Subscribe(string subscriber, string topic);
  19:  
  20:         /// <summary>
  21:         /// 持久化退訂。
  22:         /// </summary>
  23:         /// <param name="subscriber">訂閱者。</param>
  24:         /// <param name="topic">消息主題。</param>
  25:         void Unsubscribe(string subscriber, string topic);
  26:  
  27:         /// <summary>
  28:         /// 裝載訂閱信息。
  29:         /// </summary>
  30:         /// <returns>系統之中的訂閱清單。</returns>
  31:         List<SubscribeItem> LoadSubscribes();
  32:  
  33:         /// <summary>
  34:         /// 寫入消息。
  35:         /// </summary>
  36:         /// <param name="subscriber">訂閱者。</param>
  37:         /// <param name="message">消息對象。</param>
  38:         void Write(string subscriber, QueueMessage message);
  39:  
  40:         /// <summary>
  41:         /// 讀消息。
  42:         /// </summary>
  43:         /// <param name="subscriber">訂閱者。</param>
  44:         /// <param name="message">消息對象。</param>
  45:         /// <returns>成功讀取返回true,不然返回false。</returns>
  46:         bool Read(string subscriber, out QueueMessage message);
  47:     }
  48: }

     ISubscribeStorager共提供持久化訂閱持久化消息存儲共五個函數,其中:架構

     LoadSubscribes:服務端初始化時讀取全部的離線訂閱關係,即那個訂閱都訂閱那那個主題。app

     Subscribe:持久化訂閱者,當訂閱才上線訂閱消息時,持久化訂閱關係,供離線檢測之用。框架

     Unsubscribe:持久化取消訂閱,當訂閱者退訂消息時,從持久化訂閱關係之中刪除。

     Write:當訂閱者離線時,把訂閱消息寫入持久化存儲。

     Read:當離線訂閱者上線時,從持久存儲之中讀取一條消息向其發送。

     ISubscribeStorager:能夠選擇本身實現這個接口,以創建知足本身規則的離線存儲機制,固然在AgileEAS.NET SOA 中間件之中提供了兩種離線存儲機制,存儲於數據庫和存儲於MSMQ,下面向你們介紹一下這兩種內置實現。

4、兩種內置離線存儲機制

     在AgileEAS.NET SOA 中間件平臺之中提供了兩個ISubscribeStorager的實現,基於數據庫的離線訂閱存儲實現EAS.Messages.DbSubscribeStorager和基於MSMQ的離線訂閱存儲實現EAS.Messages.MsmqSubscribeStorager

     EAS.Messages.DbSubscribeStorager存儲訂閱關係在messageSubscribe.Config文件之中,消息存儲在關係數據庫SOA_SUBSCRIBEEVENTS表之中,使用前必需要創建相應的表結構,如下是SQL Server的DDL腳本:

   1: CREATE TABLE [SOA_SUBSCRIBEEVENT](
   2:     [GUID] [varchar](36) NOT NULL,
   3:     [SUBSCRIBER] [nvarchar](128) NOT NULL,
   4:     [TOPIC] [nvarchar](128) NOT NULL,
   5:     [BODY] [image] NULL,
   6:     [FCTIME] [datetime] NOT NULL,
   7:  CONSTRAINT [PK_SOA_SUBSCRIBEEVENT] PRIMARY KEY CLUSTERED 
   8: (
   9:     [GUID] ASC
  10: )
  11: ) 

      目前理論上支持SQLServer 、Mysql、ORACLE、Sqlite四種數據庫結構,具體建表腳本請自行參考相應資料書寫,也可使用AgileEAS.NET SOA中間件所提供的數據庫初始化工具建立。

      EAS.Messages.MsmqSubscribeStorager存儲訂閱關係在messageSubscribe.Config文件之中,消息存儲Msmq消息隊列之中,使用以前請確保機器上安裝了MSMQ消息對列。

5、關於自定義實現ISubscribeStorager

     有興趣的朋友能夠自定義實現接口ISubscribeStorager,這樣就能夠按本身的規則進行存儲,好比把離線消息存儲到mongodb、Redis、或者直接存儲在文件之中,或者其餘更多的實現規則,在此就不一一介紹,若有相關興趣,請聯繫做者,如確有必要須要給在家介紹一下如何實現,將會另開一文本介紹如何自定義實現ISubscribeStorager接口。

6、改進在線例子支持離線

     仍是跟上次同樣,以案例爲在家展現一下怎麼進行離線消息,就不從新開始例子,對原有例子作一些改進,改進後例子以下:

image

     其中在原有項目的基礎上增長了:Demo.Subscriber1和Demo.Subscriber2項目,其項目配置代碼、配置文件基本上同Demo.Subscriber同樣,其中惟一的差異在於,Demo.Subscriber1和Demo.Subscriber2向服務器提交訂閱的時候都增長一個另friendName參數,其使用IMessageBus接口的如下訂閱函數:

   1: /// <summary>
   2: /// 訂閱消息。
   3: /// </summary>
   4: /// <param name="subscriber">訂閱者。</param>
   5: /// <param name="friendName">訂閱者名稱,用於處理離線訂閱。</param>
   6: /// <param name="topic">主題。</param>
   7: /// <param name="notifyHandler">訂閱通知。</param>
   8: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);

                Demo.Publisher項目爲發佈者代碼。

                Demo.Subscriber項目爲訂閱者代碼。

                Demo.Server項目爲服務端代碼。

     Demo.Subscriber1項目之中,其Program.cs代碼以下:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Windows.Forms;
   5: using EAS.Messages;
   6:  
   7: namespace Demo.Subscriber1
   8: {
   9:     class Program
  10:     {
  11:         static void Main(string[] args)
  12:         {
  13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
  14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
  15:             System.Console.WriteLine("Subscriber1");
  16:  
  17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);
  18:             System.Console.ReadLine();
  19:         }
  20:  
  21:         static void MessageNotify(object m)
  22:         {
  23:             Demo.Messages.Message message = m as Demo.Messages.Message;
  24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));
  25:         }
  26:     }
  27: }

     其中bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);在訂閱消息的時候給了一個friendName爲Subscriber1,Demo.Subscriber2與Demo.Subscriber1項目的惟一的差異就是此處爲Subscriber2.

     咱們使用內置的EAS.Messages.DbSubscribeStorager,則不須要修改服務端的代碼,只須要修改服務端的配置文件以下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--數據庫鏈接-->
  12:       <object name="DbProvider" assembly="EAS.Data" type="EAS.Data.Access.SqlClientDbProvider" LifestyleType="Thread">
  13:         <property name="ConnectionString" type="string" value="Data Source=.;Initial Catalog=eas_db;Integrated Security=SSPI;Connect Timeout=30" />
  14:       </object>
  15:       <!--數據訪問器-->
  16:       <object name="DataAccessor" assembly="EAS.Data" type="EAS.Data.Access.DataAccessor" LifestyleType="Thread">
  17:         <property name="DbProvider" type="object" value="DbProvider"/>
  18:         <property name="Language" type="object" value="TSqlLanguage"/>
  19:       </object>
  20:       <!--ORM訪問器-->
  21:       <object name="OrmAccessor" assembly="EAS.Data" type="EAS.Data.ORM.OrmAccessor" LifestyleType="Thread">
  22:         <property name="DataAccessor" type="object" value="DataAccessor"/>
  23:       </object>
  24:       <!--查詢語言-->
  25:       <object name="TSqlLanguage" assembly="EAS.Data" type="EAS.Data.Linq.TSqlLanguage" LifestyleType="Thread"/>
  26:       <!--消息持久化-->
  27:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.DbSubscribeStorager" LifestyleType="Singleton"/>
  28:       <!--日誌管理-->
  29:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  30:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  31:       </object>
  32:     </objects>
  33:   </eas>
  34: </configuration>

     在配置文件的IOC配置之中咱們配置了消息存儲對象以及其所依賴的數據庫訪問對象、Linq查詢語言表達式,另外須要說明的是,咱們須要把配置文件之中所涉及的 EAS.Data.dll、EAS.SOA.BootStrap.dll複製到編譯輸出Publish,這兩個文件能夠從AgileEAS.NET SOA 中間件平臺發佈包之中尋找,本案例的下載壓碎包之中會包括這兩個文件。

     有關於基於Msmq的配置,只須要修改配置文件以下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--消息持久化-->
  12:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.MsmqSubscribeStorager" LifestyleType="Singleton"/>
  13:       <!--日誌管理-->
  14:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  15:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  16:       </object>
  17:     </objects>
  18:   </eas>
  19: </configuration>

     到此爲止,全部代碼均已完成,是否是很簡單,接下來,咱們跑起來驗證一下效果。

7、驗證效果

     咱們在編譯輸入目錄Publish下先啓動Demo.Server.exe,再各啓動Demo.Subscriber.exe、Demo.Subscriber1.exe、Demo.Subscriber2.exe,再啓動一個Demo.Publisher.exe,在Demo.Publisher.exe控制檯按回車鍵:

N_[]_C%GTD_$0[KL}}B~E$A

目前程序三個訂閱者都是在線的,Demo.Publisher發佈了三條消息,三個訂閱者都收到了三條消息,那麼咱們關閉Demo.Subscriber2以後再由Demo.Publisher發佈兩條消息:

R(OM90FW6B0WYO6R)0MQ7D4

而後咱們再啓動Demo.Subscriber2,看是否還能收到其離線以後由Demo.Publisher發佈的兩條消息:

ZE8XA`V4PE)5%F~QF}NO]0N

OK,到此爲止。

8、源代碼下載

     本程序的源代碼已上傳到服務器,請經過http://112.74.65.50/downloads/eas/Demo.Pub_Sub_Offline.rar進行下載,若是在開發過程之中想要了解更多有關Socket通訊框架以及更多AgileEAS.NET SOA中間件平臺的技術資源,請經過AgileEAS.NET SOA 網站:http://www.smarteas.net最新下載欄目進行下載。   

9、問題反饋

     麻煩你們在經過視頻進行學習的時候能及時把問題反饋給樓主,或者有什麼須要改進的一些建議都請向樓主直接反饋,如下是聯繫方式:

團隊網站:http://www.agilelab.cn

AgileEAS.NET網站:http://www.agileeas.net

官方博客:http://eastjade.cnblogs.com

github:https://github.com/agilelab/eas

樓主QQ:47920381

QQ羣:113723486(AgileEAS SOA 平臺)/上限1000人

199463175(AgileEAS SOA 交流)/上限1000人

120661978(AgileEAS.NET 平臺交流)/上限1000人

郵件:james@agilelab.cn,mail.james@qq.com,

電話:18629261335。

相關文章
相關標籤/搜索