在不少MIS項目之中都有這樣的需求,須要一個及時、高效的的通知機制,即好比當使用者A完成了任務X,就須要當即告知使用者B任務X已經完成,在一般的狀況下,開發人中都是在使用者B所使用的程序之中寫數據庫輪循代碼,這樣就會產品一個很嚴重的兩個問題,第一個問題是延遲,輪循機制要定時執行,必須會引發延遲,第二個問題是數據庫壓力過大,當進行高頻度的輪循會生產大量的數據庫查詢,而且若是有大量的使用者進行輪循,那數據庫的壓力就更大了。html
那麼在這個時間,就須要一套能支持發佈-訂閱模式的分佈式消息總線,那這個問題就能夠很好的解決了,好比採用一些成熟的消息總線進行實現,好比MSMQ或者採用好比開源的NServiceBus的發佈訂閱機制就能夠實現處理這種需求,其系統結構就會變成以下所示:git
本分佈式消息總線,目前普遍的被應用於分佈式緩存的更新通知,當在N百臺客戶短在使用緩存的過程之中,某個操做修改了緩存的數據,必須會致使其餘終端緩存的失效,那麼使用基於Socket的分佈式消息總線以後,咱們能夠作了修改了便可實時通知,作到緩存數據保持最新,再好比醫療應用之中的危急值管理,當發現檢驗、檢查危急值以後,須要及時通知病區啓動聲光報警系統等,提醒醫護工做人員及相關領導作出相應的措施,再好比應用於異構系統整合,當檢驗系統作出檢驗報告,經過消息總線進行發佈,HIS系統則即時會收到檢驗報告數據而實現系統的整合。github
目前可以實現發佈訂閱模式的開源產品很是之多,爲何還要製造輪子呢,其主要緣由有如下幾點數據庫
1)像NServiceBus這種東西基於MSMQ,在大量的發佈者-訂閱者的狀況下性能不佳。緩存
2)此類東西太過於龐大、不易使用和配置。服務器
3)學習成本太高。併發
那爲何要使用Socket技術進行實現呢,其主要緣由是有如下幾點:app
1)使用高效的Socket通訊技術,高效、支持更多的客戶端。框架
2)使用簡單,不須要定義太多額外的東西,只須要定義主題和消息便可使用。socket
目前本發佈訂閱框架是基於AgileEAS.NET SOA中間件平臺Socket框架實現的,有關於些Socket框架的技術細節請參考AgileEAS.NET SOA 中間件平臺.Net Socket通訊框架-介紹、AgileEAS.NET SOA 中間件平臺.Net Socket通訊框架-簡單例子-實現簡單的服務端客戶端消息應答、AgileEAS.NET SOA 中間件平臺.Net Socket通訊框架-完整應用例子-在線聊天室系統-下載配置、AgileEAS.NET SOA 中間件平臺.Net Socket通訊框架-完整應用例子-在線聊天室系統-代碼解析文章進行了解和學習。
目前本發佈訂閱框架並直接集成於AgileEAS.NET SOA Socket通訊框架之中而且隨其一併發佈,下面簡單介紹一下其API:
在本框架之中定義了一個消息總線接口IMessageBus:
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using System.Collections;
6:
7: namespace EAS.Messages
8: {
9: /// <summary>
10: /// 消息總線接口定義。
11: /// </summary>
12: public interface IMessageBus : IDisposable
13: {
14: /// <summary>
15: /// 註冊發佈者。
16: /// </summary>
17: /// <param name="publisher">發佈者。</param>
18: void AddPublisher(string publisher);
19:
20: /// <summary>
21: /// 註冊發佈者。
22: /// </summary>
23: /// <param name="publisher">發佈者。</param>
24: /// <param name="topic">主題。</param>
25: void AddPublisher(string publisher, string topic);
26:
27: /// <summary>
28: /// 發佈一條消息到總線。
29: /// </summary>
30: /// <param name="topic">主題。</param>
31: /// <param name="message">發佈的消息。</param>
32: void Publish(string topic, object message);
33:
34: /// <summary>
35: /// 訂閱消息。
36: /// </summary>
37: /// <param name="subscriber">訂閱者。</param>
38: /// <param name="topic">主題。</param>
39: /// <param name="notifyHandler">訂閱通知。</param>
40: void Subscribe(object subscriber, string topic, MessageNotifyHandler notifyHandler);
41:
42: /// <summary>
43: /// 訂閱消息。
44: /// </summary>
45: /// <param name="subscriber">訂閱者。</param>
46: /// <param name="friendName">訂閱者名稱,用於處理離線訂閱。</param>
47: /// <param name="topic">主題。</param>
48: /// <param name="notifyHandler">訂閱通知。</param>
49: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);
50:
51: /// <summary>
52: /// 訂閱消息。
53: /// </summary>
54: /// <param name="subscriber">訂閱者。</param>
55: /// <param name="friendName">訂閱者名稱,用於處理離線訂閱。</param>
56: /// <param name="topic">主題。</param>
57: /// <param name="notifyHandler">訂閱通知。</param>
58: /// <param name="changedHandler">發佈者狀態變化委託。</param>
59: void Subscribe(object subscriber, string friendName, string topic, MessageNotifyHandler notifyHandler,PublisherSstatusChangedHandler changedHandler);
60:
61: /// <summary>
62: /// 退訂消息。
63: /// </summary>
64: /// <param name="subscriber">訂閱者。</param>
65: void Unsubscribe(object subscriber);
66:
67: /// <summary>
68: /// 退訂消息。
69: /// </summary>
70: /// <param name="subscriber">訂閱者。</param>
71: /// <param name="topic">主題。</param>
72: void Unsubscribe(object subscriber, string topic);
73:
74: /// <summary>
75: /// 退訂消息。
76: /// </summary>
77: /// <param name="subscriber">訂閱者。</param>
78: /// <param name="friendName">訂閱者名稱,用於處理離線訂閱。</param>
79: /// <param name="topic">主題。</param>
80: void Unsubscribe(object subscriber, string friendName, string topic);
81: }
82: }
IMessageBus就基於Socket發佈訂閱消息總線的靈魂接口,也是基惟一的發佈者調用者功能入口,也就是說無論你是發佈者仍是訂閱者都須要調用這個接口,若是你是發佈者請調用IMessageBus接口的Publish方法向消息總線發佈消息,若是是你訂閱者請經過IMessageBus的訂閱方法進行訂閱,當你訂閱了某個主題以後,有發佈者發佈該主題的消息,你便可以收到消息並調用訂閱回調函數進行處理。
如今咱們開始一個簡單的應用消息總線的例子,本例子代碼解決方案由下圖4個項目組成:
其中:Demo.Messages項目定義發佈者、訂閱者所使用的消息對象和消息主題。
Demo.Publisher項目爲發佈者代碼。
Demo.Subscriber項目爲訂閱者代碼。
Demo.Server項目爲服務端代碼。
在Demo.Messages項目之中,咱們定義了一個消息Message:
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using System.Xml.Serialization;
6:
7: namespace Demo.Messages
8: {
9: [Serializable]
10: public class Message
11: {
12: [XmlElement]
13: public Guid ID
14: {
15: get;
16: set;
17: }
18: }
19: }
消息Message很簡單,只有一個屬性ID,同時 還須要定義一個消息主題:
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5:
6: namespace Demo.Messages
7: {
8: public class Topics
9: {
10: public static readonly string DEMO_TOPIC = "演示消息";
11: }
12: }
咱們定義了一個消息主題爲「演示消息」。
在Demo.Publisher項目之中,沒有太多額外的代碼,只有在Program.cs寫了如下簡單的調用代碼:
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using EAS.Messages;
6:
7: namespace Demo.Publisher
8: {
9: class Program
10: {
11: static void Main(string[] args)
12: {
13: var container = EAS.Objects.ContainerBuilder.BuilderDefault();
14: var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
15: System.Console.WriteLine("Publisher");
16:
17: while (System.Console.ReadLine()!="exit")
18: {
19: var m = new Messages.Message { ID = Guid.NewGuid() };
20: bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);
21: System.Console.WriteLine(string.Format("Publish:{0}", m.ID));
22: }
23: }
24: }
25: }
從IOC容器獲取一個消息總線IMessageBus對象,並調用Publish函數發佈消息」bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);「。
固然了,使用了IOC容器,就離來開配置文件了,其App.config文件內容以下:
1: <?xml version="1.0" encoding="utf-8"?>
2: <configuration>
3: <configSections>
4: <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
5: </configSections>
6: <eas>
7: <objects>
8: <!--消息總線-->
9: <object name="MessageBus" assembly="EAS.MicroKernel" type="EAS.Sockets.Bus.SocketBus" LifestyleType="Singleton">
10: <property name="Url" type="string" value="socket.tcp://127.0.0.1:6606/"/>
11: </object>
12: </objects>
13: </eas>
14: </configuration>
在Demo.Subscriber項目之中,使用與Demo.Publisher如出一轍的配置文件,其Program.cs代碼以下:
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using EAS.Messages;
6:
7: namespace Demo.Subscriber
8: {
9: class Program
10: {
11: static void Main(string[] args)
12: {
13: var container = EAS.Objects.ContainerBuilder.BuilderDefault();
14: var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
15: System.Console.WriteLine("Subscriber");
16:
17: bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);
18: System.Console.ReadLine();
19: }
20:
21: static void MessageNotify(object m)
22: {
23: Demo.Messages.Message message = m as Demo.Messages.Message;
24: System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));
25: }
26: }
27: }
其中代碼bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);:完成把消息訂閱到MessageNotify通知函數之中。
在Demo.Server項目之中,啓動服務器而且開始接收訂閱和發佈:
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using EAS.Sockets;
6:
7: namespace Demo.Server
8: {
9: class Program
10: {
11: static void Main(string[] args)
12: {
13: SocketServer socketServer = new SocketServer(128);
14: socketServer.Port = 6606;
15: socketServer.Logger = new EAS.Loggers.ConsoleLogger();
16: socketServer.Initialize();
17: System.Console.WriteLine("Server Starting...");
18: socketServer.StartServer();
19: System.Console.WriteLine("Server Startup!");
20: System.Console.ReadLine();
21: }
22: }
23: }
到此爲止,全部代碼均已完成,是否是很簡單,接下來,咱們跑起來驗證一下效果。
咱們在編譯輸入目錄Publish下先啓動Demo.Server.exe,再啓動兩個Demo.Subscriber.exe,再啓動一個Demo.Publisher.exe,在Demo.Publisher.exe控制檯按回車鍵:
OK,搞定。
本程序的源代碼已上傳到服務器,請經過http://112.74.65.50/downloads/eas/Demo.Pub_Sub.rar進行下載,若是在開發過程之中想要了解更多有關Socket通訊框架以及更多AgileEAS.NET SOA中間件平臺的技術資源,請經過AgileEAS.NET SOA 網站:http://www.smarteas.net的最新下載欄目進行下載。
麻煩你們在經過視頻進行學習的時候能及時把問題反饋給樓主,或者有什麼須要改進的一些建議都請向樓主直接反饋,如下是聯繫方式:
AgileEAS.NET網站:http://www.agileeas.net
官方博客:http://eastjade.cnblogs.com
github:https://github.com/agilelab/eas
QQ羣:113723486(AgileEAS SOA 平臺)/上限1000人
199463175(AgileEAS SOA 交流)/上限1000人
120661978(AgileEAS.NET 平臺交流)/上限1000人
郵件:james@agilelab.cn,mail.james@qq.com,
電話:18629261335。