消息隊列對大多數人應該比較陌生。可是要提到MQ據說過的人會多不少。MQ就是英文單詞"Message queue"的縮寫,翻譯成中文就是消息隊列(我英語差,翻譯錯了請告知)。web
PS:話說國人熟悉MQ比消息隊列多,是否是由於國人的外語水平高於國語水平好幾個數量級數據庫
一、看一下度娘怎麼解釋消息隊列api
參考連接:消息隊列_百度百科數組
度娘解釋消息隊列是在兩臺計算機間傳輸的,套句很時髦的說法就是用來作分佈式傳輸的,是個很高大上的東西服務器
二、個人見解稍有不一樣網絡
我更追溯到「消息隊列」的字面「本源」的意思。我認爲消息隊列就是消息的管理容器工具架構
消息隊列能夠在「兩臺計算機間傳輸」,也能夠同一臺計算機不一樣進行進程間傳輸,甚至是同一進程內「傳輸」併發
三、消息隊列使用主要場景框架
我認爲主要有兩種,一種是排隊先進先出(也有加優先級的),另外一種是消息發佈訂閱模式,固然兩種方式「複合」使用也是能夠的分佈式
四、消息隊列主要解決什麼問題
咱們寫的程序偶爾出現一些「靈異」問題。除了通常的業務邏輯bug外,主要就是折騰服務器了。好比,web服務器cpu滿載、數據庫cpu滿載、內存滿載、磁盤IO滿載、網絡帶寬滿載等等
我總結爲兩點,計算密集型問題(cpu滿載)和資源密集型問題(內存、磁盤、網絡)
咱們要優化程序須要知道究竟是哪一種問題,針對不一樣問題進行不一樣的優化,優化通常無非「開源」和「節流」兩種手段。
「開源」:增長計算能力(含增長cpu和服務器)和增長資源
「節流」:減小「多餘」的邏輯和資源消耗
現實中的狀況很複雜,有的時候很簡單的邏輯(但資源耗費嚴重)也能致使cpu滿載,咱們認爲程序是在「等」資源,其實它在「等」資源的時候依然吞噬了大量的「cpu」,因此把計算和資源消耗「拆分」開不少狀況下更加有效
增長了cpu如何用得上,若是咱們的程序是單線程,就算增長到256個cpu對性能改善也用處不大,另外增長了服務器咱們的邏輯如何還能保持完整,不一樣服務器的程序如何協同工做,那就是「消息隊列」隆重登場的時候了。
前面太抽象,直接上例子了
1、消息訂閱模式
一、消息訂閱代碼
public static void Test() { Soldier[] soldiers = new Soldier[] { new Soldier { Name = "士兵1" }, new Soldier { Name = "士兵2" }, new Soldier { Name = "士兵3" } }; Produce<int> producer = new Produce<int>() { Instance = new DirectiveAction() }; SubscribeChannel<int> channel = new SubscribeChannel<int>() { MaxDegreeOfParallelism = 3 }; channel.Init(); channel.AddProducer(producer); foreach (var item in soldiers) { channel.AddConsumer(new SubscribeConsume<int>() { Instance = item }); } while (producer.Run()) { } }
解讀一下:
A:定義了一個"隊伍"(士兵(Soldier)數組,等待接受命令(訂閱長官的命令))
B:定義了一個「長官」(命令的生產(發佈)者)
C:定義了一個消息訂閱頻道(SubscribeChannel)並把生產者和消費者都添加進去
這裏面要重點說一下頻道的重要性,你們「通訊」必須在相同頻道才能夠相互溝通。不在一個頻道,人民解放軍是不能聽美國將軍的命令的
另外一個方面也說明能夠經過增長不一樣頻道來創建多個消息隊列,頻道獨立於生產者和消費者,同一個生產者或者消費者也可能同時爲多個消息隊列服務,Very Good!
D:不停的等待「長官」發佈命令直至「長官」本身終止
二、F5
三、「指令」什麼鬼,再看一些代碼
/// <summary> /// 命令操做 /// </summary> public class DirectiveAction : IResultInstance<int> { /// <summary> /// /// </summary> /// <param name="result"></param> /// <returns></returns> public bool Run(ref int result) { System.Threading.Thread.Sleep(2000); Console.Write(string.Concat(string.Concat(DateTime.Now.ToString("hh:mm:ss.fff"), " 請輸入指令:"))); string str = Console.ReadLine(); Console.WriteLine(string.Concat("I say ", str)); if (string.Equals(str, "Exit", StringComparison.CurrentCultureIgnoreCase)) return false; result = IntConverter.Instance.Get(str); return true; } public void OnSuccess() { } public void OnFail() { } public void OnException(Exception ex) { } }
/// <summary> /// 士兵 /// </summary> public class Soldier : IArgInstance<int, bool> { /// <summary> /// 士兵名字 /// </summary> public string Name { get; set; } /// <summary> /// /// </summary> /// <param name="arg"></param> /// <param name="result"></param> /// <returns></returns> public bool Run(int arg, ref bool result) { Directive directive = (Directive)arg; string msg = null; switch (directive) { case Directive.LookForward: msg = "看前看"; break; case Directive.Left: msg = "向左轉"; break; case Directive.Right: msg = "向右轉"; break; case Directive.Behind: msg = "向後轉"; break; case Directive.Attention: msg = "立正"; break; case Directive.Ease: msg = "稍息"; break; case Directive.EyeRight: msg = "向右看齊"; break; case Directive.EyeLeft: msg = "向左看齊"; break; } Console.WriteLine(string.Concat(DateTime.Now.ToString("hh:mm:ss.fff"), " ", Name, msg ?? string.Concat("未知指令(", arg.ToString(),")"))); result = !string.IsNullOrWhiteSpace(msg); return true; } /// <summary> /// /// </summary> /// <param name="arg"></param> public void OnSuccess(int arg) { } /// <summary> /// /// </summary> /// <param name="arg"></param> public void OnFail(int arg) { Console.WriteLine(string.Concat("指令(", arg, ")錯誤")); } /// <summary> /// /// </summary> /// <param name="arg"></param> /// <param name="ex"></param> public void OnException(int arg, Exception ex) { Console.WriteLine(string.Concat("指令(", arg, ")異常,", ex.Message)); } }
/// <summary> /// 指令類型 /// </summary> public enum Directive { LookForward = 0,//看前看 Left = 1,//向左轉 Right = 2,//向右轉 Behind = 3,//向後轉 Attention = 4,//立正 Ease = 5,//稍息 EyeRight = 6,//向右看齊 EyeLeft = 7//向左看齊 }
解釋一下:
A:命令是枚舉(也就是int數字)
B:對於生產者就是接受控制檯輸入的行文本,若是爲「Exit」就結束,不然轉化爲int
C:消費者就是接受到一個int值轉化爲命令枚舉(Directive),執行
四、輸入幾條命令玩玩
Begin 11:54:22.164 請輸入指令:1 I say 1 12:04:23.475 士兵1向左轉 12:04:23.475 士兵3向左轉 12:04:23.475 士兵2向左轉 12:04:25.491 請輸入指令:2 I say 2 12:04:27.562 士兵2向右轉 12:04:27.562 士兵1向右轉 12:04:27.593 士兵3向右轉 12:04:29.608 請輸入指令:3 I say 3 12:04:31.343 士兵1向後轉 12:04:31.343 士兵2向後轉 12:04:31.343 士兵3向後轉 12:04:33.359 請輸入指令:0 I say 0 12:04:35.859 士兵1看前看 12:04:35.859 士兵2看前看 12:04:35.859 士兵3看前看 12:04:37.874 請輸入指令:9 I say 9 12:04:45.983 士兵1未知指令(9) 12:04:45.983 士兵3未知指令(9) 12:04:45.983 士兵2未知指令(9) 12:04:47.999 請輸入指令:2 I say 2 12:04:55.671 士兵1向右轉 12:04:55.671 士兵2向右轉 12:04:55.671 士兵3向右轉 12:04:57.686 請輸入指令:Exit I say Exit End
是否是很酷很好玩
A:因爲使用的是並行計算(Parallel.ForEach),三個士兵的執行順序是隨機的,這樣能更好的利用多cpu資源
B:呵呵,想不到使用電腦練兵這麼簡單,韓信要是能看到這一幕也只能「甘拜下風」了
2、排隊模式
一、排隊模式代碼和訂閱模式稍微不一樣
Soldier[] soldiers = new Soldier[] { new Soldier { Name = "士兵1" } , new Soldier { Name = "士兵2" } , new Soldier { Name = "士兵3" } , new Soldier { Name = "士兵4" } , new Soldier { Name = "士兵5" } , new Soldier { Name = "士兵6" } , new Soldier { Name = "士兵7" } , new Soldier { Name = "士兵8" } , new Soldier { Name = "士兵9" } };
private static void Test1(Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>(); Produce<int> producer = new Produce<int>() { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.JobTimer { Interval = 1000 } }); } channel.Init(); channel.Start(); while (producer.Run()) { } }
也解讀一下:
A:此次隊伍壯大了(哈哈,因爲前面練兵成效不錯,部隊迅速擴充了三倍),因此單獨出來以便複用
B:此次先定義了頻道,並且此次的頻道是隊列頻道(QueueChannel)
C:這裏的消費者稍有不一樣,增長了一個Timer屬性,是定時觸發器(每1000毫秒觸發一次)
注:現實項目中,在排隊模式Timer屬性並非必須的,只是這裏演示須要增長一個觸發器。由於控制檯輸入我用來控制生產者了,消費者只能讓觸發器來控制了
D:另外這裏新加一個(channel.Start())用來啓動本身的調度和全部生產者和消費者的調度
這裏只有消費者有調度,這行代碼對頻道和生產者沒有影響
E:這個例子裏面的生產者(DirectiveAction)和消費者(QueueConsumeService)是直接服用前面例子的
二、和前面例子同樣,先輸入幾個命令玩玩,看看結果
Begin 12:43:04.369 請輸入指令:1 I say 1 12:43:09.438 士兵9向左轉 12:43:10.642 請輸入指令:3 I say 3 12:43:29.751 士兵7向後轉 12:43:30.861 請輸入指令:8 I say 8 12:43:42.929 士兵4未知指令(8) 12:43:44.898 請輸入指令:Exit I say Exit End
生產者生產一個命令,9個消費者隨機一個搶到執行
其實就是9個消費者「線程」等待命令執行,大大擴充了消費的效率
固然生產者也能夠是多個,也同時生產,這樣多多的效率是否是比咱們通常同步執行效果高很多啊
PS:可是這裏我想到了一個問題,9個消費者9個獨立的「線程」,效率確實高,可是若是cpu滿載了,別說9個「線程」都不能好好的工做(運行),若是再開多個生產者,你們一塊兒來競爭cpu資源極可能致使問題:
2.1 cpu資源可能出現惡意競爭,cpu大量浪費
其實線程間不斷切換的成本也是挺高的,也是要考慮的因素之一
2.2 cpu資源分配不周
若是大量消費者使用cpu,生產者可能拿到的cpu不夠,生產的"消息"不夠消費者使用
若是大量生產者使用cpu,致使生產堆積,消費者消費不過來也是很糟糕,總體的執行速度仍是很慢(甚至還不如單線程)
三、咱們看一個優化這個問題的例子代碼
private static void Test2(Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { Timer = new Job.JobTimer { Interval = 3000 }, BufferAlert = 10 }; DirectiveProduce producer = new DirectiveProduce { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.JobTimer { Interval = 1000 } }); } channel.Init(); channel.Start(); while (producer.Run()) { } }
和前面代碼區別很小,只是給隊列頻道(QueueChannel)加了一個時間調度(Timer),另外增長了一個屬性BufferAlert(值爲10,表示若是隊列的長度超過這個值就報警(可能觸發減小生產者增長消費者策略))
很明顯,如今除了生產者和消費要消耗cpu外,隊列頻道也要消耗cpu;有人說你這樣能更快鬼才信
哈哈,這裏面「水」就深了。隊列頻道的調度不是白加的,會定時檢查隊列的狀況
若是隊列在增長,它會嘗試減小生產者增長消費者,甚至停掉全部生產者
若是隊列在較少,它會嘗試減小消費者增長生產者,甚至停掉全部消費者
什麼個意思呢,cpu也是資源,優化須要從開源和節流兩方面下手,你一味增長邏輯線程物理線程不夠用了只有害處沒有好處;這個時候停掉一些其餘的邏輯線程來釋放物理線程就比增長邏輯線程更加有效
四、繼續看測試結果
01:09:44.553 請輸入指令:1 I say 1 01:09:46.631 士兵7向左轉 01:09:47.662 請輸入指令:4 I say 4 01:09:48.647 士兵3立正 01:09:50.100 請輸入指令:2 I say 2 01:10:58.621 士兵9向右轉 01:11:00.277 請輸入指令:1 I say 1 01:11:04.701 士兵9向左轉 01:11:05.967 請輸入指令:9 I say 9 01:11:14.842 士兵9未知指令(9)
A:前兩條仍是9個消費者「搶着」執行
B:後面三條就都是最後一個消費(士兵9)一個執行了,另外8個消費者被「優化」掉了
有的人可能看不明白了,既然9個消費者你要「優化」掉8個,你還不如只定義一個消費者
哈哈別急,另外8個消費者還在呢,隨時待命呢,咱們來個模擬短暫併發
五、先看看模擬代碼
public class DirectiveProduce : Produce<int> { public override object SendMessage(Data.IEntityAdd<int> channel, int message) { if (message < 10) return base.SendMessage(channel, message); for (int i = message; i > 0; i--) { base.SendMessage(channel, i % 8); } return message; } }
咱們對生產者作了改動,若是傳了的消息>=10,咱們就把這個消息拆分爲n份來發送,這樣必然致使消息堆積
六、檢測一下消息堆積後的效果
01:20:48.089 請輸入指令:2 I say 2 01:23:19.149 士兵9向右轉 01:23:20.899 請輸入指令:4 I say 4 01:23:21.180 士兵9立正 01:23:22.914 請輸入指令:19 I say 19 01:23:36.337 士兵9向後轉 01:23:37.353 士兵9向右轉 01:23:38.291 請輸入指令:01:23:38.353 士兵9向左轉 01:23:38.431 士兵6向左看齊 01:23:38.431 士兵7稍息 01:23:38.431 士兵2向右轉 01:23:38.431 士兵8看前看 01:23:38.431 士兵4向右看齊 01:23:38.431 士兵3立正 01:23:38.431 士兵1向左轉 01:23:38.431 士兵5向後轉 01:23:39.369 士兵9看前看 01:23:39.447 士兵4向左看齊 01:23:39.447 士兵1向右看齊 01:23:39.447 士兵5向右轉 01:23:39.447 士兵7稍息 01:23:39.447 士兵8立正 01:23:39.447 士兵6向後轉 01:23:39.447 士兵3向左轉 5 I say 5 01:25:47.964 士兵9稍息 01:25:49.230 請輸入指令:6 I say 6 01:25:49.980 士兵9向右看齊
A:程序啓動我故意等了一會再輸入,就是等把消費者線程優化後再測試
B:前面兩個指令是「士兵9」執行的毫無疑問
C:後面我批量模擬了19個指令,其餘消費者陸續都出來了
消費者的週期是1秒,19個指令9個消費者全開啓了,3秒就所有執行完了,效果是否是槓槓的
D:後來我又等了一會,再輸入兩個指令又都只有「士兵9」在執行,也就是說執行批處理後,其餘8個消費者又陸續被「優化」掉了
這個是優化消費者,若是是生產者也是時間調取也會被優化,效果是同樣,這裏就不舉例了
PS:這裏我仍是發現一個問題,時間調度(Timer)的問題,若是時間調度週期設置過短,執行任務是更加及時,可是cpu耗費太多,設置太長效率又過低,而併發老是在不經意發生,雖然能夠動態增長生產者和消費者,可是生產者和消費的線程仍是可能浪費嚴重。
這個我深有感觸,我原來開發消息隊列初期這個週期是寫死,有的業務任務是天天甚至是每週、每個月執行一次,週期設過小浪費cpu;有的業務偶發每秒10多個甚至幾十個,每秒一個調度都遠不夠用
爲了這個問題,我修改了屢次調度週期的值,都沒法知足各類需求,並且消息隊列還時常成爲cpu和內存的殺手,偶發cpu滿載、內存泄露、進程假死,一段悲催的時光
後來,我」發明「了一個東西,我給它取名爲」雙向變速齒輪「,這個是我加班坐滾梯去餐廳吃飯時想到的。你看着滾梯在慢速運行,你一踩上去電梯就加速了,你下來後不久,電梯又變慢了。我就嘗試在本身的消息隊列中實現這樣的邏輯。等這個邏輯成熟了,每種任務都基本上相安無事了,結果是So Perfect。
七、變速齒輪消息隊列例子代碼
private static void Test3(Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { Timer = new Job.JobTimer { Interval = 3000 }, BufferAlert = 10 }; DirectiveProduce producer = new DirectiveProduce { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); NumIncreaser increaser = new NumIncreaser { Min = 2000, Max = 60000 }; NumLower lower = new NumLower { Min = 5, Max = 1000 }; foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.AutoJobTimer { Interval = 1000, Increaser = increaser, Lower = lower } }); } channel.Init(); channel.Start(); while (producer.Run()) { } }
這個例子和前面例子區別很小,就是把JobTimer換成AutoJobTimer,AutoJobTimer有兩個屬性,Increaser是週期變長(減速齒輪),Lower是週期變短(加速齒輪)
Increaser是在2秒和1分鐘之間變更
Lower是在5毫秒到1秒鐘之間變更
也就是說,這樣設置把消費者實際(通常狀況下輪訓)的週期從1秒拉長爲1分鐘,節省了98%的輪訓cpu
可是卻在高併發的狀況下加速99%
字面上算起來就是這樣,具體效果咱們來測試一下
八、變速齒輪運行結果
A:以上例子是併發1000個消息,因爲控制檯一頁已經顯示不下了,直好把最前和最後各截圖一張
B:先後13秒1000個任務執行完畢,對比前面沒加變速齒輪的是3秒執行了19個任務,初算下來加速92%(這個測試會受到採集數據的精度及每次測試的偶發狀況稍後不一樣)
以上都是進程內的消息隊列測試,分佈式的消息隊列更高大上,能不能也支持很好呢?
固然能夠,前面展現的很清楚,消息隊列由生產者、消費者、消息頻道(排隊頻道和訂閱頻道)三大塊組成,且每一塊都相互獨立,要接入分佈式按此模式便可,不少狀況下只須要定義其中一塊甚至只是其中一塊的一部分
3、MSMQ(微軟消息隊列)的例子
一、先看生產者測試代碼
public static void Test() { Fang.MSMQ.Queue<int> queue = new Fang.MSMQ.Queue<int>(); queue.Init(); Test1(queue); } private static void Test1(IEntityQueue<int> queue) { QueueChannel<int> channel = new QueueChannel<int>() { QueueBuffer = queue }; Produce<int> producer = new Produce<int>() { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); channel.Init(); channel.Start(); while (producer.Run()) { } }
注:以上代碼和前面的代碼仍是很類似,區別以下:
A:這個代碼中只有頻道和生產者,消息頻道和生產者的類和之前的同樣(使用MSMQ不須要定義新的消息頻道和生產者)
B:消息頻道指定了使用MSMQ的隊列(封裝MSMQ實現框架的IEntityQueue接口便可)
面向接口可擴展框架嘛,認得是接口並非具體實現
二、再看消費者測試代碼
public static void Test() { Fang.MSMQ.Queue<int> queue = new Fang.MSMQ.Queue<int>(); queue.Init(); Soldier[] soldiers = new Soldier[] { new Soldier { Name = "士兵1" } , new Soldier { Name = "士兵2" } , new Soldier { Name = "士兵3" } , new Soldier { Name = "士兵4" } , new Soldier { Name = "士兵5" } , new Soldier { Name = "士兵6" } , new Soldier { Name = "士兵7" } , new Soldier { Name = "士兵8" } , new Soldier { Name = "士兵9" } }; Test1(queue, soldiers); }
private static void Test1(IEntityQueue<int> queue, Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { QueueBuffer = queue }; foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.JobTimer { Interval = 1000 } }); } channel.Init(); channel.Start(); }
以上生產者和消費在不一樣的控制檯程序中測試,模擬兩臺計算機
A:這個代碼中只有頻道和消費者,消息頻道和生產者的類和之前的同樣(使用MSMQ也不須要定義新的消費者)
消息頻道定義和生產者測試程序徹底同樣,實際上是兩臺計算機(或者兩個進程),寫程序的時候就像是一臺同樣
B:消息頻道指定了使用MSMQ的隊列(封裝MSMQ實現框架的IEntityQueue接口便可)
三、分別執行結果以下
生產者和消費者都在隨時待命
四、輸入幾個命令玩玩
生產者和消費者的執行時間差在2秒左右,含我操做輸入時間,算我輸入1秒左右,生產者和消費者時間差在1秒左右
注:不要糾結生產者的第一個提示信息時間,因第一個提示信息後我並無當即輸入,等了一會致使較大時間差
五、爲了說明真的是走的MSMQ,咱們先把消費者關掉,看系統的消息隊列列表
注:以上是我又生產的4個消息
注:以上是MSMQ消息隊列截圖
注:以上是最後一條消息的正文截圖
能夠看到,這裏使用的xml序列化的方式,有人可能會說,xml很差,性能很差等云云,其實對於使用哪一種序列化也是面向接口的,徹底能夠配置
五、用Json發個消息的代碼
public static void TestJson() { Fang.Serialization.DataContractJson.Provider provider0 = new Fang.Serialization.DataContractJson.Provider(); Fang.Serialization.Formater<QueueTest, System.IO.Stream> formater = new Serialization.Formater<QueueTest, System.IO.Stream>() { Provider = provider0 }; Fang.MSMQ.Queue<QueueTest> queue = new Fang.MSMQ.Queue<QueueTest>() { Name = "QueueTest", Serializer = formater, Deserializer = formater }; queue.Init(); QueueChannel<QueueTest> channel = new QueueChannel<QueueTest>() { QueueBuffer = queue, }; Produce<QueueTest> producer = new Produce<QueueTest>() { }; channel.AddProducer(producer); channel.Init(); channel.Start(); producer.SendMessage(new QueueTest { Name = "Test1" }); }
在新的消息隊列中使用Json序列化的方式發的
六、MSMQ消息隊列很是簡單,公佈源代碼給你們看一下(還沒徹底寫完,不少功能待完善)
/// <summary> /// MSMQ隊列 /// </summary> /// <typeparam name="TEntity"></typeparam> public class Queue<TEntity> : Formatter<TEntity>, IEntityQueue<TEntity>, IUnitInit { #region 定義 /// <summary> /// /// </summary> public Queue() { Host = "."; Path = "Private$"; Name = "Default"; BufferSize = Int16.MaxValue; if (ReceiveTimeOut < 1) ReceiveTimeOut = 1000D; _receiveTime = TimeSpan.FromMilliseconds(ReceiveTimeOut); } private MessageQueue _provider; /// <summary> /// 隊列功能支持原始對象 /// </summary> public MessageQueue Provider { get { return _provider; } } private TimeSpan _receiveTime; /// <summary> /// /// </summary> public string Path { get; set; } /// <summary> /// /// </summary> public string Name { get; set; } /// <summary> /// 主機 /// </summary> public string Host { get; set; } /// <summary> /// 單位(Millisecond) /// </summary> public double ReceiveTimeOut { get; set; } #endregion #region IEntityQueue<TEntity> /// <summary> /// /// </summary> /// <param name="entity"></param> /// <returns></returns> public bool TryDequeue(ref TEntity entity) { if (_provider == null) return false; Message message = null; try { message = _provider.Receive(_receiveTime); } catch { return false; } if (message == null) return false; return Transform.TryConvert<object, TEntity>(message.Body, ref entity); } /// <summary> /// /// </summary> /// <returns></returns> public TEntity Dequeue() { TEntity entity = default(TEntity); TryDequeue(ref entity); return entity; } /// <summary> /// /// </summary> /// <param name="entity"></param> /// <returns></returns> public bool Enqueue(TEntity entity) { if (entity == null || _provider == null) return false; Message message = new Message(); message.Body = entity; message.Formatter = this; try { _provider.Send(message); } catch { return false; } return true; } /// <summary> /// /// </summary> public int BufferSize { get; set; } /// <summary> /// /// </summary> /// <returns></returns> public int Count() { if (_provider == null) return 0; return 100; } #endregion #region IUnitInit /// <summary> /// 初始化 /// </summary> public void Init() { string fullName = string.Concat(Host, "\\", Path, "\\", Name); if (!MessageQueue.Exists(fullName)) { MessageQueue.Create(fullName); } _provider = new MessageQueue(fullName); _provider.Formatter = this; } #endregion /// <summary> /// 清空 /// </summary> public void Clear() { if (_provider == null) return; _provider.Purge(); } //public static string CreatePath(string host,string path) //{ //} }
/// <summary> /// MSMQ格式化(序列化和反序列化) /// </summary> public class Formatter<TEntity> : IMessageFormatter, ICloneable { /// <summary> /// /// </summary> public Formatter() { Capacity = 1024; } #region 配置 /// <summary> /// /// </summary> public int Capacity { get; set; } /// <summary> /// 序列化工具 /// </summary> public ISerialize<TEntity, Stream> Serializer { get; set; } /// <summary> /// 反序列化工具 /// </summary> public IDeserialize<Stream, TEntity> Deserializer { get; set; } #endregion #region IMessageFormatter /// <summary> /// /// </summary> /// <param name="message"></param> /// <returns></returns> public bool CanRead(Message message) { return message.BodyStream != null && message.BodyStream.Length > 0; } /// <summary> /// /// </summary> /// <param name="message"></param> /// <returns></returns> public object Read(Message message) { if (message == null) return null; IDeserialize<Stream, TEntity> instance = Deserializer ?? GlobalServices.Instance.CreateDeserializer<Stream, TEntity>(); TEntity entity = default(TEntity); try { Transform.TryDeserialize<Stream, TEntity>(instance, message.BodyStream, ref entity); } catch { } return entity; } /// <summary> /// /// </summary> /// <param name="message"></param> /// <param name="obj"></param> public void Write(Message message, object obj) { TEntity entity = default(TEntity); if (!Transform.TryConvert<object, TEntity>(obj, ref entity)) return; Stream stream = new System.IO.MemoryStream(Capacity); ISerialize<TEntity, Stream> instance = Serializer ?? GlobalServices.Instance.CreateSerializer<TEntity, Stream>(); try { Transform.TrySerialize<TEntity, Stream>(instance, entity, ref stream); } catch { } message.BodyStream = stream; } #region ICloneable /// <summary> /// /// </summary> /// <returns></returns> public object Clone() { return this; } #endregion #endregion }
消息隊列這個組件我開發測試了比較長的時間,也是項目開發關鍵。之後還要整合爲框架的基礎服務,這樣用起來就更加方便和簡單。
分佈式消息隊列一直都是高大上的東西,不少分佈式架構都是以分佈式消息隊列爲基石來構架。
我有一句話常常說,」好鋼要用在刀刃上,好東西不能濫用「。分佈式消息隊列也是,咱們用分佈式任務解決計算密集型問題仍是資源密集型問題。若是都不是,可不能夠嘗試更多的內存(進程內)消息隊列,說不定驚喜多多啊。
我也力爭讓分佈式消息隊列和內存(進程內)消息隊列編碼上幾乎沒有區別,這樣」兩種「消息隊列就能夠很方便的切換。
網上有不少優秀開源消息隊列,用法各異,我打算封裝爲一致的api(只是配置不同),來把消息隊列發揚廣大。