它是一種異步傳輸模式,能夠在不一樣的應用之間實現相互通訊,相互通訊的應用能夠分佈在同一臺機器上,也能夠分佈於相連的網絡空間中的任一位置。html
它的實現原理是:消息的發送者把本身想要發送的信息放入一個Message中,而後把它保存至一個系統公用空間的消息隊列(Message Queue)中;本地或者是異服務器
地的消息接收程序再從該隊列中取出發給它的消息進行處理。如圖所示:網絡
優勢:支持離線通信;有消息優先級;有保障的消息傳遞和執行許多業務處理的可靠的防故障機制;息傳遞機制使得消息通訊的雙方具備不一樣的物理平臺成爲可能。多線程
缺點:很難知足實時交互需求。併發
一、數據採集:適合多設備多應用數據採集功能。app
二、輔助實時交互:在大併發系統中,某一個操做涉及到不少步驟,某些步驟是不須要及時處理的,將不須要及時處理的步驟提出來,用消息隊列處理。異步
好比:在一個高併發購物網站,一個顧客下單,顧客的操做記錄、顧客的餘額記錄、商品相關的統計記錄是不須要及時處理的,這些能夠放消息隊列處理,ide
延時處理。函數
三、多線程流水線處理:能夠應用於生產者和消費者模式裏面。好比:批量文件解壓+解析+入庫處理。爲了下降服務器壓力,將各個步驟分佈在不一樣的高併發
服務器上面,每一個步驟的結果能夠放入消息隊列。
在使用MSMQ開發以前,須要安裝消息隊列。按順序操做:控制面板-》程序-》打開或關閉Windows功能,勾選MSMQ服務器全部選項,如圖所示:
在調試、操做隊列的時候,插入和取出隊列信息之後,須要查看操做是否成功,就得去存放隊列的服務器中查看隊列信息。操做順序:計算機-》管理-》
服務和應用程序,在裏面有相應隊列存儲的信息,如圖所示:
文件做用:
MSMQ隊列操做的屬性設置:
/// <summary> /// MSMQ隊列操做的屬性設置 /// </summary> public class MessageQueueSettings { /// <summary> /// 專用隊列的基本路徑 /// </summary> private string basePath = "private$"; /// <summary> /// 隊列的ID /// </summary> public Guid MessageQueueId { get; set; } /// <summary> /// 隊列的標籤 /// </summary> public string MessageQueueLabel { get; set; } = "由程序建立的默認消息隊列"; /// <summary> /// 訪問隊列的路徑,默認是本機,記作「.」 /// </summary> public string RemotePath { get; set; } = "."; /// <summary> /// 隊列名稱:默認是 msmqdefault /// </summary> public string MessageQueueName { get; set; } = ConfigSource.DefaultMSMQName; /// <summary> /// 隊列的路徑 /// </summary> public string MessageQueuePath => $"{RemotePath}\\{basePath}\\{MessageQueueName}"; /// <summary> /// 消息隊列類型 /// </summary> public QueueType MessageQueueType { get; set; } = QueueType.PrivateQueue; /// <summary> /// 獲取或設置隊列日誌的大小 /// </summary> public long MaximumJournalSize { get; set; } = uint.MaxValue; /// <summary> /// 是否啓用日誌 /// </summary> public bool UseJournalQueue { get; set; } = true; /// <summary> /// 獲取或設置隊列的大小 /// </summary> public long MaximumQueueSize { get; set; } = uint.MaxValue; /// <summary> /// 是否啓用隊列事務 /// </summary> public bool IsUseTransaction { get; set; } = true; /// <summary> /// 隊列的訪問方式 /// </summary> public QueueAccessMode AccessMode { get; set; } = QueueAccessMode.SendAndReceive; /// <summary> /// 設置/獲取是否通過身份驗證 /// </summary> public bool Authenticate { get; set; } = false; /// <summary> /// 獲取或設置基優先級,「消息隊列」使用該基優先級在網絡上傳送公共隊列的消息。 /// </summary> public short BasePriority { get; set; } = 0; }
隊列類型enum:
/// <summary> /// 隊列類型 /// </summary> public enum QueueType { /// <summary> /// 私有隊列 /// </summary> PrivateQueue = 0, /// <summary> /// 公共隊列 /// </summary> PublicQueue = 1, /// <summary> /// 管理隊列 /// </summary> ManageQueue = 2, /// <summary> /// 響應隊列 /// </summary> ResponseQueue = 3 }
邏輯代碼裏面不直接調用隊列建立方法,直接用操做基類的構造方法建立。思路:初始化參數實體-》初始化待參數實體的操做基類實例-》實例裏面構造方法
判斷是否存在存在該隊列,若是存在就返回該隊列實例,若是不存在,就建立隊列,返回實例。
基礎操做類:
#region 構造函數 /// <summary> /// 默認構造函數:不對外開放 /// </summary> private MessageQueueBase() { } /// <summary> /// 構造函數 /// </summary> /// <param name="setting">消息隊列的配置</param> public MessageQueueBase(MessageQueueSettings setting) { try { this.msetting = setting; // 校驗是否存在隊列 if (msetting.RemotePath == "." && !Exists()) this.mqueue = Create(); else this.mqueue = new MessageQueue(msetting.MessageQueuePath, msetting.AccessMode); if (msetting.RemotePath == ".") { // 設置是否通過身份驗證:默認爲否 this.mqueue.Authenticate = msetting.Authenticate; // 是否啓用日誌隊列 this.mqueue.UseJournalQueue = msetting.UseJournalQueue; // 最大日誌隊列長度:最大值爲uint.MaxValue this.mqueue.MaximumJournalSize = msetting.MaximumJournalSize > uint.MaxValue ? uint.MaxValue : msetting.MaximumJournalSize; // 最大隊列長度:最大值爲uint.MaxValue this.mqueue.MaximumQueueSize = msetting.MaximumQueueSize > uint.MaxValue ? uint.MaxValue : msetting.MaximumJournalSize; // 隊列標籤 this.mqueue.Label = msetting.MessageQueueLabel; } // 設置基優先級,默認是0 this.mqueue.BasePriority = msetting.BasePriority; // 獲取消息隊列的id msetting.MessageQueueId = mqueue.Id; } catch (Exception ex) { throw ex; } } ~MessageQueueBase() { this.mqueue?.Close(); GC.SuppressFinalize(this); } #endregion #region 操做方法 /// <summary> /// 驗證隊列是否存在 /// </summary> /// <returns>驗證結果</returns> public bool Exists() { var successed = false; try { successed = MessageQueue.Exists(msetting.MessageQueuePath); } catch (Exception ex) { throw ex; } return successed; } /// <summary> /// 建立隊列 /// </summary> /// <param name="type">隊列類型(默認是專用隊列)</param> /// <returns>隊列對象</returns> public MessageQueue Create() { MessageQueue queue = default(MessageQueue); try { queue = MessageQueue.Create(msetting.MessageQueuePath, msetting.IsUseTransaction); } catch (Exception ex) { throw ex; } return queue; } /// <summary> /// 刪除隊列 /// </summary> public void Delete() { try { MessageQueue.Delete(msetting.MessageQueuePath); } catch (Exception ex) { throw ex; } } #endregion
業務調用類:
/// <summary> /// 建立消息隊列 /// </summary> /// <param name="queueName">隊列名稱</param> /// <param name="transaction">是不是事務隊列</param> /// <returns></returns> public bool CreateMessageQueue(string queueName, bool transaction) { bool result = false; MessageQueueSettings mqs = new MessageQueueSettings() { MessageQueueName= queueName, IsUseTransaction= transaction }; IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs); result=messageQueueBase.Exists(); return result; }
測試結果:
發送消息分爲單條發送和列表發送,是否啓用事務,以當前需求和隊列屬性決定。
基礎操做類:
#region 發送操做 /// <summary> /// 發送數據到隊列 /// </summary> /// <typeparam name="T">發送的數據類型</typeparam> /// <param name="t">發送的數據對象</param> public void Send<T>(T t) { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; using (Message message = new Message()) { tran?.Begin(); message.Body = t; message.Label = $"推送時間[{DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss.fff")}]"; message.UseDeadLetterQueue = true; message.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); message.TimeToBeReceived = new TimeSpan(30, 0, 0, 0); message.TimeToReachQueue = new TimeSpan(30, 0, 0, 0); if (tran != null) { mqueue.Send(message, tran); } else { mqueue.Send(message); } tran?.Commit(); tran?.Dispose(); } } catch (Exception ex) { tran?.Abort(); tran?.Dispose(); throw ex; } } /// <summary> /// 發送多條數據 /// </summary> /// <typeparam name="T">發送的數據類型</typeparam> /// <param name="ts">發送的數據對象集合</param> public void SendList<T>(IList<T> ts) { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; tran?.Begin(); foreach (var item in ts) { using (Message message = new Message()) { message.Body = item; message.Label = $"推送時間[{DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss.fff")}]"; message.UseDeadLetterQueue = true; message.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); message.TimeToBeReceived = new TimeSpan(30, 0, 0, 0); message.TimeToReachQueue = new TimeSpan(30, 0, 0, 0); if (tran != null) { mqueue.Send(message, tran); } else { mqueue.Send(message); } } } tran?.Commit(); tran?.Dispose(); } catch (Exception ex) { tran?.Abort(); tran?.Dispose(); throw ex; } } #endregion
業務調用類:
/// <summary> /// 發送消息 /// </summary> /// <param name="queueName">隊列名稱</param> /// <param name="priority">優先級</param> /// <param name="isTransaction">是否啓用隊列事務</param> /// <param name="userInfo">新增用戶信息</param> public void SendMessage(string queueName, short priority, bool isTransaction, user userInfo) { MessageQueueSettings mqs = new MessageQueueSettings() { MessageQueueName = queueName, BasePriority = priority, IsUseTransaction = isTransaction }; IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs); messageQueueBase.Send<user>(userInfo); }
測試結果:
消息接收分爲同步接收和異步接收。
基礎操做類:
#region 接收方法 /// <summary> /// 同步獲取隊列消息 /// </summary> /// <typeparam name="T">返回的數據類型</typeparam> /// <param name="action">獲取數據後的回調</param> public void Receive<T>(Action<T> action) where T : class { try { mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); Message message = mqueue.Receive(); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("隊列獲取的類型與泛型類型不符"); } catch (Exception ex) { throw ex; } } /// <summary> /// 同步查詢但不移除隊列第一條數據 /// </summary> /// <typeparam name="T">返回的數據類型</typeparam> /// <param name="action">獲取數據後的回調</param> public void Peek<T>(Action<T> action) where T : class { try { mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); Message message = mqueue.Peek(); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("隊列獲取的類型與泛型類型不符"); } catch (Exception ex) { throw ex; } } /// <summary> /// 獲取全部消息 /// </summary> /// <typeparam name="T">數據類型</typeparam> /// <returns>取出來的數據</returns> public IList<T> GetAll<T>() where T : class { try { mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); var enumerator = mqueue.GetMessageEnumerator2(); var ret = new List<T>(); while (enumerator.MoveNext()) { var messageInfo = enumerator.Current.Body as T; if (messageInfo != null) { ret.Add(messageInfo); } enumerator.RemoveCurrent(); enumerator.Reset(); } return ret; } catch (Exception ex) { throw ex; } } /// <summary> /// 異步接收隊列消息,並刪除接收的消息數據 /// </summary> /// <typeparam name="T">返回的數據類型</typeparam> /// <param name="action">獲取數據後的回調</param> public void ReceiveAsync<T>(Action<T> action) where T : class { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; if (!initReceiveAsync) { mqueue.ReceiveCompleted += (sender, e) => { Message message = mqueue.EndReceive(e.AsyncResult); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("隊列獲取的類型與泛型類型不符"); }; initReceiveAsync = true; } mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); tran?.Begin(); mqueue.BeginReceive(); tran?.Commit(); } catch (Exception ex) { throw ex; } } /// <summary> /// 啓動一個沒有超時設定的異步查看操做。直到隊列中出現消息時,才完成此操做。 /// </summary> /// <typeparam name="T">獲取的消息數據的類型</typeparam> /// <param name="action">獲取數據後的回調</param> public void PeekAsync<T>(Action<T> action) where T : class { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; if (!initPeekAsync) { mqueue.PeekCompleted += (sender, e) => { Message message = mqueue.EndPeek(e.AsyncResult); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("隊列獲取的類型與泛型類型不符"); }; initPeekAsync = true; } mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); tran?.Begin(); mqueue.BeginPeek(); tran?.Commit(); } catch (Exception ex) { throw ex; } } /// <summary> /// 異步循環獲取隊列消息,調用此方法將一直接收隊列數據,直到IsStopReceiveLoop被設置爲true /// </summary> /// <typeparam name="T">返回的數據類型</typeparam> /// <param name="action">獲取數據後的回調</param> public void ReceiveAsyncLoop<T>(Action<T> action) where T : class { MessageQueueTransaction tran = null; try { tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null; if (!initReceiveAsync) { mqueue.ReceiveCompleted += (sender, e) => { Message message = mqueue.EndReceive(e.AsyncResult); T obj = message.Body as T; if (obj != null) action(obj); else throw new InvalidCastException("隊列獲取的類型與泛型類型不符"); //只要不中止就一直接收 if (!IsStopReceiveLoop) { tran?.Begin(); mqueue.BeginReceive(); tran?.Commit(); } }; initReceiveAsync = true; } mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) }); IsStopReceiveLoop = false; tran?.Begin(); mqueue.BeginReceive(); tran?.Commit(); } catch (Exception ex) { throw ex; } } #endregion
業務調用類:
/// <summary> /// 接收消息 /// </summary> /// <param name="queueName">隊列</param> /// <param name="isAsy">是否異步</param> public user ReceiveMessage(string queueName, bool isAsy) { user userInfo = new user(); MessageQueueSettings mqs = new MessageQueueSettings() { MessageQueueName = queueName }; IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs); if (!isAsy) { messageQueueBase.Receive<user>(p => { userInfo = p; }); } else { messageQueueBase.ReceiveAsync<user>(p => { userInfo = p; }); } return userInfo; } /// <summary> /// 獲取所有 /// </summary> /// <param name="queueName"></param> /// <returns></returns> public IList<user> GetAll(string queueName) { MessageQueueSettings mqs = new MessageQueueSettings() { MessageQueueName = queueName }; IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs); return messageQueueBase.GetAll<user>(); }
測試結果:
點擊插入數據,在隊列中插入N條數據。同步接收和異步接收每次只獲取隊列第一條數據;異步循環是每次獲取一條數據,循環獲取,知道隊列沒有數據;
接收所有是一次性獲取指定隊列全部數據。
下一篇寫Dapper應用或Redis應用。