自已動手作高性能消息隊列

前言

        本人以爲碼農的技術提高應該是從how to do到why do,而項目或產品都是從why do到how to do,按題來,因此呢下面先從大的方面介紹一下消息隊列。html

        消息隊列是分佈式高併發面目中必不可少的一部分,隨着互聯網、雲計算、大數據的使用,消息隊列的應用愈來愈多,消息隊列在系統的可伸縮性、穩定性、提高吞吐量等方面有着顯著的做用;它主要的做用通常以下:git

       1.經過異步處理提升系統性能

 

 

        如上圖,在不使用消息隊列服務器的時候,用戶的請求數據直接寫入數據庫,在高併發的狀況下數據庫壓力劇增,使得響應速度變慢。可是在使用消息隊列以後,用戶的請求數據發送給消息隊列以後當即 返回,再由消息隊列的消費者進程從消息隊列中獲取數據,異步寫入數據庫。因爲消息隊列服務器處理速度快於數據庫(消息隊列也比數據庫有更好的伸縮性),所以響應速度獲得大幅改善。

        經過以上分析咱們能夠得出消息隊列具備很好的削峯做用的功能——即經過異步處理,將短期高併發產生的事務消息存儲在消息隊列中,從而削平高峯期的併發事務。 舉例:在電子商務一些秒殺、促銷活動中,合理使用消息隊列能夠有效抵禦促銷活動剛開始大量訂單涌入對系統的衝擊。以下圖所示:github

 

        由於用戶請求數據寫入消息隊列以後就當即返回給用戶了,可是請求數據在後續的業務校驗、寫數據庫等操做中可能失敗。所以使用消息隊列進行異步處理以後,須要適當修改業務流程進行配合,好比用戶在提交訂單以後,訂單數據寫入消息隊列,不能當即返回用戶訂單提交成功,須要在消息隊列的訂單消費者進程真正處理完該訂單以後,甚至出庫後,再經過電子郵件或短信通知用戶訂單成功,以避免交易糾紛。這就相似咱們平時手機訂火車票和電影票。redis

       2.下降系統耦合性

        咱們知道模塊分佈式部署之後聚合方式一般有兩種:1.分佈式消息隊列和2.分佈式服務。先來簡單說一下分佈式服務目前使用比較多的用來構建SOA(Service Oriented Architecture面向服務體系結構)分佈式服務框架是阿里巴巴開源的Dubbo.若是想深刻了解Dubbo的能夠看我寫的關於Dubbo的這一篇文章:《高性能優秀的服務框架-dubbo介紹》咱們知道若是模塊之間不存在直接調用,那麼新增模塊或者修改模塊就對其餘模塊影響較小,這樣系統的可擴展性無疑更好一些。數據庫

        咱們最多見的事件驅動架構相似生產者消費者模式,在大型網站中一般用利用消息隊列實現事件驅動結構。以下圖所示:數組

 

 

        消息隊列使利用發佈-訂閱模式工做,消息發送者(生產者)發佈消息,一個或多個消息接受者(消費者)訂閱消息。 從上圖能夠看到消息發送者(生產者)和消息接受者(消費者)之間沒有直接耦合,消息發送者將消息發送至分佈式消息隊列即結束對消息的處理,消息接受者從分佈式消息隊列獲取該消息後進行後續處理,並不須要知道該消息從何而來。對新增業務,只要對該類消息感興趣,便可訂閱該消息,對原有系統和業務沒有任何影響,從而實現網站業務的可擴展性設計服務器

        消息接受者對消息進行過濾、處理、包裝後,構形成一個新的消息類型,將消息繼續發送出去,等待其餘消息接受者訂閱該消息。所以基於事件(消息對象)驅動的業務架構能夠是一系列流程。session

        另外爲了不消息隊列服務器宕機形成消息丟失,會將成功發送到消息隊列的消息存儲在消息生產者服務器上,等消息真正被消費者服務器處理後才刪除消息。在消息隊列服務器宕機後,生產者服務器會選擇分佈式消息隊列服務器集羣中的其餘服務器發佈消息。架構

        前面說了這麼多消息隊列的重要性、使用場景、工做模式,有不少人就可能會說了,現有的ActiveMQ、RabbitMQ、Kafka、RocketMQ等多了去了,那在項目架構的時候選一個用上去就不行了,徹底沒有必要重複造輪子啊!本人認爲對於重複造輪子的事情和其它任何事情都是同樣的——任何事情沒有絕對的好處或者壞處,好比是剛入門的碼農、又或者很急的項目,徹底能夠選用現有一種通用的、成熟的產品,不必去從零開始作;實際上沒有任何一個優秀的產品所有使用三方的產品來組裝完成的,任何一個好一點的項目發展到必定的時候都不約而同的進行底層開發。緣由很簡單:第一個任何通用型的產品總用功能覆蓋不到的場景;第二個任何通用型的產品爲了實現通用必將作了一些性能或架構的犧牲;如今道理都講完了,開始動手了(都聽你逼半天,能動手就儘可能少逼逼!)。併發

 概述

  動手前先構思一下,本人須要一個簡單的、可發佈訂閱的、高吞吐量的消息隊列,並將之簡單大的方面分紅QServer、QClient;QServer主要有Exchange、Binding、MessageQueue構成;QClient和QServer共用一套相同的傳輸編解碼器QCoder ,主要實現Publish、Subscribe、Unsubcribe、Closes等功能;先想這麼多,開幹!

Exchange

  主要在QServer中提供發佈、訂閱、鏈接、隊列信息等管理

  1 /****************************************************************************
  2 *Copyright (c) 2018 Microsoft All Rights Reserved.
  3 *CLR版本: 4.0.30319.42000
  4 *機器名稱:WENLI-PC
  5 *公司名稱:Microsoft
  6 *命名空間:SAEA.QueueSocket.Model
  7 *文件名: Exchange
  8 *版本號: V1.0.0.0
  9 *惟一標識:6a576aad-edcc-446d-b7e5-561a622549bf
 10 *當前的用戶域:WENLI-PC
 11 *建立人: yswenli
 12 *電子郵箱:wenguoli_520@qq.com
 13 *建立時間:2018/3/5 16:36:44
 14 *描述:
 15 *
 16 *=====================================================================
 17 *修改標記
 18 *修改時間:2018/3/5 16:36:44
 19 *修改人: yswenli
 20 *版本號: V1.0.0.0
 21 *描述:
 22 *
 23 *****************************************************************************/
 24 
 25 using SAEA.Commom;
 26 using SAEA.Sockets.Interface;
 27 using System;
 28 using System.Collections.Generic;
 29 using System.Linq;
 30 using System.Text;
 31 using System.Threading;
 32 using System.Threading.Tasks;
 33 
 34 namespace SAEA.QueueSocket.Model
 35 {
 36     class Exchange : ISyncBase
 37     {
 38         object _syncLocker = new object();
 39 
 40         public object SyncLocker
 41         {
 42             get
 43             {
 44                 return _syncLocker;
 45             }
 46         }
 47 
 48         long _pNum = 0;
 49 
 50         long _cNum = 0;
 51 
 52         long _inNum = 0;
 53 
 54         long _outNum = 0;
 55 
 56         private Binding _binding;
 57 
 58         private MessageQueue _messageQueue;
 59 
 60         public Exchange()
 61         {
 62             this._binding = new Binding();
 63 
 64             this._messageQueue = new MessageQueue();
 65         }
 66 
 67 
 68         public void AcceptPublish(string sessionID, QueueResult pInfo)
 69         {
 70             lock (_syncLocker)
 71             {
 72                 this._binding.Set(sessionID, pInfo.Name, pInfo.Topic);
 73 
 74                 this._messageQueue.Enqueue(pInfo.Topic, pInfo.Data);
 75 
 76                 _pNum = this._binding.GetPublisherCount();
 77 
 78                 Interlocked.Increment(ref _inNum);
 79             }
 80         }
 81 
 82         public void AcceptPublishForBatch(string sessionID, QueueResult[] datas)
 83         {
 84             if (datas != null)
 85             {
 86                 foreach (var data in datas)
 87                 {
 88                     if (data != null)
 89                     {
 90                         AcceptPublish(sessionID, data);
 91                     }
 92                 }
 93             }
 94         }
 95 
 96 
 97         public void GetSubscribeData(string sessionID, QueueResult sInfo, int maxSize = 500, int maxTime = 500, Action<List<string>> callBack = null)
 98         {
 99             lock (_syncLocker)
100             {
101                 var result = this._binding.GetBingInfo(sInfo);
102 
103                 if (result == null)
104                 {
105                     this._binding.Set(sessionID, sInfo.Name, sInfo.Topic, false);
106 
107                     _cNum = this._binding.GetSubscriberCount();
108 
109                     Task.Factory.StartNew(() =>
110                     {
111                         while (this._binding.Exists(sInfo))
112                         {
113                             var list = this._messageQueue.DequeueForList(sInfo.Topic, maxSize, maxTime);
114                             if (list != null)
115                             {
116                                 list.ForEach(i => { Interlocked.Increment(ref _outNum); });
117                                 callBack?.Invoke(list);
118                                 list.Clear();
119                                 list = null;
120                             }
121                         }
122                     });
123                 }
124             }            
125         }
126 
127         public void Unsubscribe(QueueResult sInfo)
128         {
129             Interlocked.Decrement(ref _cNum);
130             this._binding.Del(sInfo.Name, sInfo.Topic);
131         }
132 
133         public void Clear(string sessionID)
134         {
135             lock (_syncLocker)
136             {
137                 var data = this._binding.GetBingInfo(sessionID);
138 
139                 if (data != null)
140                 {
141                     if (data.Flag)
142                     {
143                         Interlocked.Decrement(ref _pNum);
144                     }
145                     else
146                     {
147                         Interlocked.Decrement(ref _cNum);
148                     }
149                     this._binding.Remove(sessionID);
150                 }
151             }
152         }
153 
154         public Tuple<long, long, long, long> GetConnectInfo()
155         {
156             return new Tuple<long, long, long, long>(_pNum, _cNum, _inNum, _outNum);
157         }
158 
159         public List<Tuple<string, long>> GetQueueInfo()
160         {
161             List<Tuple<string, long>> result = new List<Tuple<string, long>>();
162             lock (_syncLocker)
163             {
164                 var list = this._messageQueue.ToList();
165                 if (list != null)
166                 {
167                     var tlts = list.Select(b => b.Topic).Distinct().ToList();
168 
169                     if (tlts != null)
170                     {
171                         foreach (var topic in tlts)
172                         {
173                             var count = this._messageQueue.GetCount(topic);
174                             var t = new Tuple<string, long>(topic, count);
175                             result.Add(t);
176                         }
177                         tlts.Clear();
178                     }
179                     list.Clear();
180                 }
181             }
182             return result;
183         }
184 
185     }
186 }

  思惟發散:這裏能夠增長全局消息隊列、指定鏈接消息隊列等;將鏈接經過類型redis cluster模式進行一個均衡分佈等

Binding

  主要功能是將鏈接、主題進行映射管理

  1 /****************************************************************************
  2 *Copyright (c) 2018 Microsoft All Rights Reserved.
  3 *CLR版本: 4.0.30319.42000
  4 *機器名稱:WENLI-PC
  5 *公司名稱:Microsoft
  6 *命名空間:SAEA.QueueSocket.Model
  7 *文件名: Binding
  8 *版本號: V1.0.0.0
  9 *惟一標識:7472dabd-1b6a-4ffe-b19f-2d1cf7348766
 10 *當前的用戶域:WENLI-PC
 11 *建立人: yswenli
 12 *電子郵箱:wenguoli_520@qq.com
 13 *建立時間:2018/3/5 17:10:19
 14 *描述:
 15 *
 16 *=====================================================================
 17 *修改標記
 18 *修改時間:2018/3/5 17:10:19
 19 *修改人: yswenli
 20 *版本號: V1.0.0.0
 21 *描述:
 22 *
 23 *****************************************************************************/
 24 
 25 using SAEA.Commom;
 26 using SAEA.Sockets.Interface;
 27 using System;
 28 using System.Collections.Generic;
 29 using System.Linq;
 30 using System.Text;
 31 
 32 namespace SAEA.QueueSocket.Model
 33 {
 34     /// <summary>
 35     /// 鏈接與主題的映射
 36     /// </summary>
 37     class Binding : ISyncBase, IDisposable
 38     {
 39         List<BindInfo> _list = new List<BindInfo>();
 40 
 41         object _syncLocker = new object();
 42 
 43         public object SyncLocker
 44         {
 45             get
 46             {
 47                 return _syncLocker;
 48             }
 49         }
 50 
 51         bool _isDisposed = false;
 52 
 53         int _minutes = 10;
 54 
 55         public Binding(int minutes = 10)
 56         {
 57             _minutes = minutes;
 58 
 59             ThreadHelper.PulseAction(() =>
 60             {
 61                 lock (_syncLocker)
 62                 {
 63                     var list = _list.Where(b => b.Expired <= DateTimeHelper.Now).ToList();
 64                     if (list != null)
 65                     {
 66                         list.ForEach(item =>
 67                         {
 68                             _list.Remove(item);
 69                         });
 70                         list.Clear();
 71                         list = null;
 72                     }
 73                 }
 74             }, new TimeSpan(0, 0, 10), _isDisposed);
 75         }
 76 
 77 
 78         public void Set(string sessionID, string name, string topic, bool isPublisher = true)
 79         {
 80 
 81             lock (_syncLocker)
 82             {
 83                 var result = _list.FirstOrDefault(b => b.Name == name && b.Topic == topic);
 84                 if (result == null)
 85                 {
 86                     _list.Add(new BindInfo()
 87                     {
 88                         SessionID = sessionID,
 89                         Name = name,
 90                         Topic = topic,
 91                         Flag = isPublisher,
 92                         Expired = DateTimeHelper.Now.AddMinutes(_minutes)
 93                     });
 94                 }
 95                 else
 96                 {
 97                     result.Expired = DateTimeHelper.Now.AddMinutes(_minutes);
 98                 }
 99             }
100         }
101 
102         public void Del(string sessionID, string topic)
103         {
104             lock (_syncLocker)
105             {
106                 var result = _list.FirstOrDefault(b => b.Name == sessionID && b.Topic == topic);
107                 if (result != null)
108                 {
109                     _list.Remove(result);
110                 }
111             }
112         }
113 
114         public void Remove(string sessionID)
115         {
116             lock (_syncLocker)
117             {
118                 var result = _list.Where(b => b.SessionID == sessionID).ToList();
119                 if (result != null)
120                 {
121                     result.ForEach((item) =>
122                     {
123                         _list.Remove(item);
124                     });
125                     result.Clear();
126                 }
127             }
128         }
129 
130         public BindInfo GetBingInfo(QueueResult sInfo)
131         {
132             lock (_syncLocker)
133             {
134                 var bi = _list.FirstOrDefault(b => b.Name == sInfo.Name && b.Topic == sInfo.Topic);
135 
136                 if (bi != null)
137                 {
138                     if (bi.Expired <= DateTimeHelper.Now)
139                     {
140                         Remove(bi.SessionID);
141                     }
142                     else
143                     {
144                         return bi;
145                     }
146                 }
147                 return null;
148             }
149         }
150 
151         public BindInfo GetBingInfo(string sessionID)
152         {
153             lock (_syncLocker)
154             {
155                 return _list.FirstOrDefault(b => b.SessionID == sessionID);
156             }
157         }
158 
159         public bool Exists(QueueResult sInfo)
160         {
161             lock (_syncLocker)
162             {
163                 var data = _list.FirstOrDefault(b => b.Name == sInfo.Name && b.Topic == sInfo.Topic);
164 
165                 if (data != null)
166                 {
167                     if (data.Expired <= DateTimeHelper.Now)
168                     {
169                         Remove(data.SessionID);
170 
171                         return false;
172                     }
173 
174                     data.Expired = DateTimeHelper.Now.AddMinutes(_minutes);
175 
176                     return true;
177                 }
178             }
179             return false;
180         }
181 
182 
183         public IEnumerable<BindInfo> GetPublisher()
184         {
185             lock (_syncLocker)
186             {
187                 return _list.Where(b => b.Flag);
188             }
189         }
190 
191         public int GetPublisherCount()
192         {
193             lock (_syncLocker)
194             {
195                 return _list.Where(b => b.Flag).Count();
196             }
197         }
198 
199         public IEnumerable<BindInfo> GetSubscriber()
200         {
201             lock (_syncLocker)
202             {
203                 return _list.Where(b => !b.Flag);
204             }
205         }
206 
207         public int GetSubscriberCount()
208         {
209             lock (_syncLocker)
210             {
211                 return _list.Where(b => !b.Flag).Count();
212             }
213         }
214 
215 
216         public void Dispose()
217         {
218             _isDisposed = true;
219             lock (_syncLocker)
220             {
221                 _list.Clear();
222                 _list = null;
223             }
224         }
225     }
226 }

  思惟發散:實現多個QServer的主題與隊列映射克隆、或者隊列消息轉發實現容災集羣或大容量集羣等

MessageQueue

  將主題與隊列造成一個映射,並對主題映射進行管理

  1 /****************************************************************************
  2 *Copyright (c) 2018 Microsoft All Rights Reserved.
  3 *CLR版本: 4.0.30319.42000
  4 *機器名稱:WENLI-PC
  5 *公司名稱:Microsoft
  6 *命名空間:SAEA.QueueSocket.Model
  7 *文件名: QueueCollection
  8 *版本號: V1.0.0.0
  9 *惟一標識:89a65c12-c4b3-486b-a933-ad41c3db6621
 10 *當前的用戶域:WENLI-PC
 11 *建立人: yswenli
 12 *電子郵箱:wenguoli_520@qq.com
 13 *建立時間:2018/3/6 10:31:11
 14 *描述:
 15 *
 16 *=====================================================================
 17 *修改標記
 18 *修改時間:2018/3/6 10:31:11
 19 *修改人: yswenli
 20 *版本號: V1.0.0.0
 21 *描述:
 22 *
 23 *****************************************************************************/
 24 
 25 using SAEA.Commom;
 26 using SAEA.Sockets.Interface;
 27 using System;
 28 using System.Collections.Concurrent;
 29 using System.Collections.Generic;
 30 using System.Linq;
 31 using System.Threading.Tasks;
 32 
 33 namespace SAEA.QueueSocket.Model
 34 {
 35     public class MessageQueue : ISyncBase, IDisposable
 36     {
 37         bool _isDisposed = false;
 38 
 39         ConcurrentDictionary<string, QueueBase> _list;
 40 
 41         object _syncLocker = new object();
 42 
 43         public object SyncLocker
 44         {
 45             get
 46             {
 47                 return _syncLocker;
 48             }
 49         }
 50 
 51         public MessageQueue()
 52         {
 53             _list = new ConcurrentDictionary<string, QueueBase>();
 54 
 55             ThreadHelper.Run(() =>
 56             {
 57                 while (!_isDisposed)
 58                 {
 59                     var list = _list.Values.Where(b => b.Expired <= DateTimeHelper.Now);
 60                     if (list != null)
 61                     {
 62                         foreach (var item in list)
 63                         {
 64                             if (item.Length == 0)
 65                             {
 66                                 _list.TryRemove(item.Topic, out QueueBase q);
 67                             }
 68                         }
 69                     }
 70                     ThreadHelper.Sleep(10000);
 71                 }
 72             }, true, System.Threading.ThreadPriority.Highest);
 73         }
 74 
 75 
 76         public void Enqueue(string topic, string data)
 77         {
 78             var queue = _list.Values.FirstOrDefault(b => b.Topic.Equals(topic));
 79             lock (_syncLocker)
 80             {
 81                 if (queue == null)
 82                 {
 83                     queue = new QueueBase(topic);
 84                     _list.TryAdd(topic, queue);
 85                 }                
 86             }
 87             queue.Enqueue(data);
 88         }
 89 
 90 
 91         public string Dequeue(string topic)
 92         {
 93             var queue = _list.Values.FirstOrDefault(b => b.Topic.Equals(topic));
 94             if (queue != null)
 95             {
 96                 return queue.Dequeue();
 97             }
 98             return null;
 99         }
100 
101         /// <summary>
102         /// 批量讀取數據
103         /// </summary>
104         /// <param name="topic"></param>
105         /// <param name="maxSize"></param>
106         /// <param name="maxTime"></param>
107         /// <returns></returns>
108         public List<string> DequeueForList(string topic, int maxSize = 500, int maxTime = 500)
109         {
110             List<string> result = new List<string>();
111             bool running = true;
112             var m = 0;
113             var task = Task.Factory.StartNew(() =>
114             {
115                 while (running)
116                 {
117                     var data = Dequeue(topic);
118                     if (data != null)
119                     {
120                         result.Add(data);
121                         m++;
122                         if (m == maxSize)
123                         {
124                             running = false;
125                         }
126                     }
127                     else
128                     {
129                         ThreadHelper.Sleep(1);
130                     }
131                 }
132             });
133             Task.WaitAll(new Task[] { task }, maxTime);
134             running = false;
135             return result;
136         }
137 
138         public string BlockDequeue(string topic)
139         {
140             var queue = _list.Values.FirstOrDefault(b => b.Topic == topic);
141             if (queue != null)
142             {
143                 return queue.BlockDequeue();
144             }
145             return null;
146         }
147 
148         public List<QueueBase> ToList()
149         {
150             lock (_syncLocker)
151             {
152                 return _list.Values.ToList();
153             }
154         }
155 
156         public long GetCount(string topic)
157         {
158             var queue = _list.Values.FirstOrDefault(b => b.Topic == topic);
159             if (queue != null)
160                 return queue.Length;
161             return 0;
162         }
163 
164         public void Dispose()
165         {
166             _isDisposed = true;
167             _list.Clear();
168                 _list = null;
169         }
170     }
171 }

  思惟發散:增長硬盤持久化以實現down機容災、增長ack確認再移除以實現高可靠性等

QCoder

  在QServer和QClient之間進行傳輸編解碼,這個編解碼的速度直接影響消息隊列的傳輸性能;本人使用了2種方案:1.使用相似redis傳輸方案,使用回車做爲分隔符方式,這種方案結果要麼一個字節一個字節檢查分隔符,這種for操做仍是C、C++屌,C#作這個真心不行;要麼先將字節數組經過Encoding轉換成String再來for,雖然說能提高几倍性能,可是遇到不完整的字節數組時,本人沒有找一個好的方法。2.使用自定義類型+長度+內容這種格式

  1 /****************************************************************************
  2 *Copyright (c) 2018 Microsoft All Rights Reserved.
  3 *CLR版本: 4.0.30319.42000
  4 *機器名稱:WENLI-PC
  5 *公司名稱:Microsoft
  6 *命名空間:SAEA.QueueSocket.Net
  7 *文件名: QCoder
  8 *版本號: V1.0.0.0
  9 *惟一標識:88f5a779-8294-47bc-897b-8357a09f2fdb
 10 *當前的用戶域:WENLI-PC
 11 *建立人: yswenli
 12 *電子郵箱:wenguoli_520@qq.com
 13 *建立時間:2018/3/5 18:01:56
 14 *描述:
 15 *
 16 *=====================================================================
 17 *修改標記
 18 *修改時間:2018/3/5 18:01:56
 19 *修改人: yswenli
 20 *版本號: V1.0.0.0
 21 *描述:
 22 *
 23 *****************************************************************************/
 24 
 25 using SAEA.Commom;
 26 using SAEA.QueueSocket.Model;
 27 using SAEA.QueueSocket.Type;
 28 using SAEA.Sockets.Interface;
 29 using System;
 30 using System.Collections.Generic;
 31 using System.Text;
 32 
 33 namespace SAEA.QueueSocket.Net
 34 {
 35     public sealed class QCoder : ICoder
 36     {
 37         static readonly int MIN = 1 + 4 + 4 + 0 + 4 + 0 + 0;
 38 
 39         private List<byte> _buffer = new List<byte>();
 40 
 41         private object _locker = new object();
 42 
 43         public void Pack(byte[] data, Action<DateTime> onHeart, Action<ISocketProtocal> onUnPackage, Action<byte[]> onFile)
 44         {
 45 
 46         }
 47 
 48         /// <summary>
 49         /// 隊列編解碼器
 50         /// </summary>
 51         public QueueCoder QueueCoder { get; set; } = new QueueCoder();
 52 
 53         /// <summary>
 54         /// 包解析
 55         /// </summary>
 56         /// <param name="data"></param>
 57         /// <param name="OnQueueResult"></param>
 58         public void GetQueueResult(byte[] data, Action<QueueResult> OnQueueResult)
 59         {
 60             lock (_locker)
 61             {
 62                 try
 63                 {
 64                     _buffer.AddRange(data);
 65 
 66                     if (_buffer.Count > (1 + 4 + 4 + 0 + 4 + 0 + 0))
 67                     {
 68                         var buffer = _buffer.ToArray();
 69 
 70                         QCoder.Decode(buffer, (list, offset) =>
 71                         {
 72                             if (list != null)
 73                             {
 74                                 foreach (var item in list)
 75                                 {
 76                                     OnQueueResult?.Invoke(new QueueResult()
 77                                     {
 78                                         Type = (QueueSocketMsgType)item.Type,
 79                                         Name = item.Name,
 80                                         Topic = item.Topic,
 81                                         Data = item.Data
 82                                     });
 83                                 }
 84                                 _buffer.RemoveRange(0, offset);
 85                             }
 86                         });
 87                     }
 88                 }
 89                 catch (Exception ex)
 90                 {
 91                     ConsoleHelper.WriteLine("QCoder.GetQueueResult error:" + ex.Message + ex.Source);
 92                     _buffer.Clear();
 93                 }
 94             }
 95         }
 96 
 97 
 98 
 99         /// <summary>
100         /// socket 傳輸字節編碼
101         /// 格式爲:1+4+4+x+4+x+4
102         /// </summary>
103         /// <param name="queueSocketMsg"></param>
104         /// <returns></returns>
105         public static byte[] Encode(QueueSocketMsg queueSocketMsg)
106         {
107             List<byte> list = new List<byte>();
108 
109             var total = 4 + 4 + 4;
110 
111             var nlen = 0;
112 
113             var tlen = 0;
114 
115             byte[] n = null;
116             byte[] tp = null;
117             byte[] d = null;
118 
119             if (!string.IsNullOrEmpty(queueSocketMsg.Name))
120             {
121                 n = Encoding.UTF8.GetBytes(queueSocketMsg.Name);
122                 nlen = n.Length;
123                 total += nlen;
124             }
125             if (!string.IsNullOrEmpty(queueSocketMsg.Topic))
126             {
127                 tp = Encoding.UTF8.GetBytes(queueSocketMsg.Topic);
128                 tlen = tp.Length;
129                 total += tlen;
130             }
131             if (!string.IsNullOrEmpty(queueSocketMsg.Data))
132             {
133                 d = Encoding.UTF8.GetBytes(queueSocketMsg.Data);
134                 total += d.Length;
135             }
136 
137             list.Add(queueSocketMsg.Type);
138             list.AddRange(BitConverter.GetBytes(total));
139             list.AddRange(BitConverter.GetBytes(nlen));
140             if (nlen > 0)
141                 list.AddRange(n);
142             list.AddRange(BitConverter.GetBytes(tlen));
143             if (tlen > 0)
144                 list.AddRange(tp);
145             if (d != null)
146                 list.AddRange(d);
147             var arr = list.ToArray();
148             list.Clear();
149             return arr;
150         }
151 
152         /// <summary>
153         /// socket 傳輸字節解碼
154         /// </summary>
155         /// <param name="data"></param>
156         /// <param name="onDecode"></param>
157         public static bool Decode(byte[] data, Action<QueueSocketMsg[], int> onDecode)
158         {
159             int offset = 0;
160 
161             try
162             {
163                 if (data != null && data.Length > offset + MIN)
164                 {
165                     var list = new List<QueueSocketMsg>();
166 
167                     while (data.Length > offset + MIN)
168                     {
169                         var total = BitConverter.ToInt32(data, offset + 1);
170 
171                         if (data.Length >= offset + total + 1)
172                         {
173                             offset += 5;
174 
175                             var qm = new QueueSocketMsg((QueueSocketMsgType)data[0]);
176                             qm.Total = total;
177 
178                             qm.NLen = BitConverter.ToInt32(data, offset);
179                             offset += 4;
180 
181 
182                             if (qm.NLen > 0)
183                             {
184                                 var narr = new byte[qm.NLen];
185                                 Buffer.BlockCopy(data, offset, narr, 0, narr.Length);
186                                 qm.Name = Encoding.UTF8.GetString(narr);
187                             }
188                             offset += qm.NLen;
189 
190                             qm.TLen = BitConverter.ToInt32(data, offset);
191 
192                             offset += 4;
193 
194                             if (qm.TLen > 0)
195                             {
196                                 var tarr = new byte[qm.TLen];
197                                 Buffer.BlockCopy(data, offset, tarr, 0, tarr.Length);
198                                 qm.Topic = Encoding.UTF8.GetString(tarr);
199                             }
200                             offset += qm.TLen;
201 
202                             var dlen = qm.Total - 4 - 4 - qm.NLen - 4 - qm.TLen;
203 
204                             if (dlen > 0)
205                             {
206                                 var darr = new byte[dlen];
207                                 Buffer.BlockCopy(data, offset, darr, 0, dlen);
208                                 qm.Data = Encoding.UTF8.GetString(darr);
209                                 offset += dlen;
210                             }
211                             list.Add(qm);
212                         }
213                         else
214                         {
215                             break;
216                         }
217                     }
218                     if (list.Count > 0)
219                     {
220                         onDecode?.Invoke(list.ToArray(), offset);
221                         return true;
222                     }
223                 }
224             }
225             catch (Exception ex)
226             {
227                 ConsoleHelper.WriteLine($"QCoder.Decode error:{ex.Message} stack:{ex.StackTrace} data:{data.Length} offset:{offset}");
228             }
229             onDecode?.Invoke(null, 0);
230             return false;
231         }
232 
233 
234         /// <summary>
235         /// dispose
236         /// </summary>
237         public void Dispose()
238         {
239             _buffer.Clear();
240             _buffer = null;
241         }
242 
243 
244 
245     }
246 }

測試

  簡單的How to do和Why do已經完成了,是時候定義個Producer、Consumer來測試一把了

  1 using SAEA.QueueSocket;
  2 using SAEA.Commom;
  3 using SAEA.QueueSocket.Model;
  4 using System;
  5 using System.Collections.Generic;
  6 using System.Diagnostics;
  7 using System.Text;
  8 using System.Threading;
  9 using System.Threading.Tasks;
 10 
 11 namespace SAEA.QueueSocketTest
 12 {
 13     class Program
 14     {
 15         static void Main(string[] args)
 16         {
 17             do
 18             {
 19                 ConsoleHelper.WriteLine("輸入s啓動隊列服務器,輸入p啓動生產者,輸入c啓動消費者");
 20 
 21                 var inputStr = ConsoleHelper.ReadLine();
 22 
 23                 if (!string.IsNullOrEmpty(inputStr))
 24                 {
 25                     var topic = "測試頻道";
 26 
 27                     switch (inputStr.ToLower())
 28                     {
 29                         case "s":
 30                             ConsoleHelper.Title = "SAEA.QueueServer";
 31                             ServerInit();
 32                             break;
 33                         case "p":
 34                             ConsoleHelper.Title = "SAEA.QueueProducer";
 35                             ConsoleHelper.WriteLine("輸入ip:port鏈接到隊列服務器");
 36                             inputStr = ConsoleHelper.ReadLine();
 37                             ProducerInit(inputStr, topic);
 38                             break;
 39                         case "c":
 40                             ConsoleHelper.Title = "SAEA.QueueConsumer";
 41                             ConsoleHelper.WriteLine("輸入ip:port鏈接到隊列服務器");
 42                             inputStr = ConsoleHelper.ReadLine();
 43                             ConsumerInit(inputStr, topic);
 44                             break;
 45                         default:
 46                             ServerInit();
 47                             inputStr = "127.0.0.1:39654";
 48                             ProducerInit(inputStr, topic);
 49                             ConsumerInit(inputStr, topic);
 50                             break;
 51                     }
 52                     ConsoleHelper.WriteLine("回車退出!");
 53                     ConsoleHelper.ReadLine();
 54                     return;
 55                 }
 56             }
 57             while (true);
 58         }
 59 
 60 
 61 
 62         static QServer _server;
 63         static void ServerInit()
 64         {
 65             _server = new QServer();
 66             _server.OnDisconnected += Server_OnDisconnected;
 67             _server.CalcInfo((ci, qi) =>
 68             {
 69                 var result = string.Format("生產者:{0} 消費者:{1} 收到消息:{2} 推送消息:{3}{4}", ci.Item1, ci.Item2, ci.Item3, ci.Item4, Environment.NewLine);
 70 
 71                 qi.ForEach((item) =>
 72                 {
 73                     result += string.Format("隊列名稱:{0} 堆積消息數:{1} {2}", item.Item1, item.Item2, Environment.NewLine);
 74                 });
 75                 ConsoleHelper.WriteLine(result);
 76             });
 77             _server.Start();
 78         }
 79 
 80         private static void Server_OnDisconnected(string ID, Exception ex)
 81         {
 82             _server.Clear(ID);
 83             if (ex != null)
 84             {
 85                 ConsoleHelper.WriteLine("{0} 已從服務器斷開,err:{1}", ID, ex.ToString());
 86             }
 87         }
 88 
 89         static void ProducerInit(string ipPort, string topic)
 90         {
 91             int pNum = 0;
 92 
 93             //string msg = "主要緣由是因爲在高併發環境下,因爲來不及同步處理,請求每每會發生堵塞,好比說,大量的insert,update之類的請求同時到達MySQL,直接致使無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。經過使用消息隊列,咱們能夠異步處理請求,從而緩解系統的壓力。";
 94             string msg = "123";
 95             if (string.IsNullOrEmpty(ipPort)) ipPort = "127.0.0.1:39654";
 96 
 97             QClient producer = new QClient("productor" + Guid.NewGuid().ToString("N"), ipPort);
 98 
 99             producer.OnError += Producer_OnError;
100 
101             producer.OnDisconnected += Client_OnDisconnected;
102 
103             producer.ConnectAsync((s) =>
104             {
105                 Task.Factory.StartNew(() =>
106                 {
107                     var old = 0;
108                     var speed = 0;
109                     while (producer.Connected)
110                     {
111                         speed = pNum - old;
112                         old = pNum;
113                         ConsoleHelper.WriteLine("生產者已成功發送:{0} 速度:{1}/s", pNum, speed);
114                         Thread.Sleep(1000);
115                     }
116                 });
117 
118                 var list = new List<Tuple<string, byte[]>>();
119                 
120 
121                 while (producer.Connected)
122                 {                   
123 
124                     producer.Publish(topic, msg);
125 
126                     Interlocked.Increment(ref pNum);
127                 }
128             });
129 
130 
131         }
132 
133         private static void Producer_OnError(string ID, Exception ex)
134         {
135             ConsoleHelper.WriteLine("id:" + ID + ",error:" + ex.Message);
136         }
137 
138         static void ConsumerInit(string ipPort, string topic)
139         {
140             if (string.IsNullOrEmpty(ipPort)) ipPort = "127.0.0.1:39654";
141             QClient consumer = new QClient("subscriber-" + Guid.NewGuid().ToString("N"), ipPort);
142             consumer.OnMessage += Subscriber_OnMessage;
143             consumer.OnDisconnected += Client_OnDisconnected;
144             consumer.ConnectAsync((s) =>
145             {
146                 Task.Factory.StartNew(() =>
147                 {
148                     var old = 0;
149                     var speed = 0;
150                     while (consumer.Connected)
151                     {
152                         speed = _outNum - old;
153                         old = _outNum;
154                         ConsoleHelper.WriteLine("消費者已成功接收:{0} 速度:{1}/s", _outNum, speed);
155                         Thread.Sleep(1000);
156                     }
157                 });
158 
159                 consumer.Subscribe(topic);
160             });
161 
162         }
163 
164         private static void Client_OnDisconnected(string ID, Exception ex)
165         {
166             ConsoleHelper.WriteLine("當前鏈接已關閉");
167         }
168 
169         static int _outNum = 0;
170 
171         private static void Subscriber_OnMessage(QueueResult obj)
172         {
173             if (obj != null)
174                 _outNum += 1;
175         }
176     }
177 }

  單線程的、單生產者、單消費者、單隊列服務器的測試結果以下圖:

QueueSocketTest

  到此一個自行實現的簡單的消息隊列完成了,雖然說它離實際產品還很遙遠,可是本人仍是覺的技術的提高離不開鑽研,路漫漫其修遠兮,吾將上下而求索!

轉載請標明本文來源:http://www.cnblogs.com/yswenli/p//9029587.html 
更多內容歡迎star做者的github:https://github.com/yswenli/SAEA若是發現本文有什麼問題和任何建議,也隨時歡迎交流~

相關文章
相關標籤/搜索