消息隊列應用

正文

簡介

        它是一種異步傳輸模式,能夠在不一樣的應用之間實現相互通訊,相互通訊的應用能夠分佈在同一臺機器上,也能夠分佈於相連的網絡空間中的任一位置。html

它的實現原理是:消息的發送者把本身想要發送的信息放入一個Message中,而後把它保存至一個系統公用空間的消息隊列(Message Queue)中;本地或者是異服務器

地的消息接收程序再從該隊列中取出發給它的消息進行處理。如圖所示:網絡

    

優缺點與使用場景

  • 優缺點

    優勢:支持離線通信;有消息優先級;有保障的消息傳遞和執行許多業務處理的可靠的防故障機制;息傳遞機制使得消息通訊的雙方具備不一樣的物理平臺成爲可能。多線程

        缺點:很難知足實時交互需求。併發

  • 使用場景

        一、數據採集:適合多設備多應用數據採集功能。app

        二、輔助實時交互:在大併發系統中,某一個操做涉及到不少步驟,某些步驟是不須要及時處理的,將不須要及時處理的步驟提出來,用消息隊列處理。異步

好比:在一個高併發購物網站,一個顧客下單,顧客的操做記錄、顧客的餘額記錄、商品相關的統計記錄是不須要及時處理的,這些能夠放消息隊列處理,ide

延時處理。函數

        三、多線程流水線處理:能夠應用於生產者和消費者模式裏面。好比:批量文件解壓+解析+入庫處理。爲了下降服務器壓力,將各個步驟分佈在不一樣的高併發

服務器上面,每一個步驟的結果能夠放入消息隊列。

準備工做

  • 環境準備

        在使用MSMQ開發以前,須要安裝消息隊列。按順序操做:控制面板-》程序-》打開或關閉Windows功能,勾選MSMQ服務器全部選項,如圖所示:

        

  • 存放位置 

       在調試、操做隊列的時候,插入和取出隊列信息之後,須要查看操做是否成功,就得去存放隊列的服務器中查看隊列信息。操做順序:計算機-》管理-》

服務和應用程序,在裏面有相應隊列存儲的信息,如圖所示:

        

MSMQ的代碼封裝

  • 基礎準備

         文件做用:

   

      MSMQ隊列操做的屬性設置:

    /// <summary>
    /// MSMQ隊列操做的屬性設置
    /// </summary>
    public class MessageQueueSettings
    {
        /// <summary>
        /// 專用隊列的基本路徑
        /// </summary>
        private string basePath = "private$";

        /// <summary>
        /// 隊列的ID
        /// </summary>
        public Guid MessageQueueId { get; set; }

        /// <summary>
        /// 隊列的標籤
        /// </summary>
        public string MessageQueueLabel { get; set; } = "由程序建立的默認消息隊列";

        /// <summary>
        /// 訪問隊列的路徑,默認是本機,記作「.」
        /// </summary>
        public string RemotePath { get; set; } = ".";

        /// <summary>
        /// 隊列名稱:默認是 msmqdefault
        /// </summary>
        public string MessageQueueName { get; set; } = ConfigSource.DefaultMSMQName;

        /// <summary>
        /// 隊列的路徑
        /// </summary>
        public string MessageQueuePath => $"{RemotePath}\\{basePath}\\{MessageQueueName}";

        /// <summary>
        /// 消息隊列類型
        /// </summary>
        public QueueType MessageQueueType { get; set; } = QueueType.PrivateQueue;

        /// <summary>
        /// 獲取或設置隊列日誌的大小
        /// </summary>
        public long MaximumJournalSize { get; set; } = uint.MaxValue;

        /// <summary>
        /// 是否啓用日誌
        /// </summary>
        public bool UseJournalQueue { get; set; } = true;

        /// <summary>
        /// 獲取或設置隊列的大小
        /// </summary>
        public long MaximumQueueSize { get; set; } = uint.MaxValue;

        /// <summary>
        /// 是否啓用隊列事務
        /// </summary>
        public bool IsUseTransaction { get; set; } = true;

        /// <summary>
        /// 隊列的訪問方式
        /// </summary>
        public QueueAccessMode AccessMode { get; set; } = QueueAccessMode.SendAndReceive;

        /// <summary>
        /// 設置/獲取是否通過身份驗證
        /// </summary>
        public bool Authenticate { get; set; } = false;

        /// <summary>
        /// 獲取或設置基優先級,「消息隊列」使用該基優先級在網絡上傳送公共隊列的消息。
        /// </summary>
        public short BasePriority { get; set; } = 0;

    }
View Code

    隊列類型enum:

    /// <summary>
    /// 隊列類型
    /// </summary>
    public enum QueueType
    {
        /// <summary>
        /// 私有隊列
        /// </summary>
        PrivateQueue = 0,

        /// <summary>
        /// 公共隊列
        /// </summary>
        PublicQueue = 1,

        /// <summary>
        /// 管理隊列
        /// </summary>
        ManageQueue = 2,

        /// <summary>
        /// 響應隊列
        /// </summary>
        ResponseQueue = 3
    }
View Code
  • 建立隊列

      邏輯代碼裏面不直接調用隊列建立方法,直接用操做基類的構造方法建立。思路:初始化參數實體-》初始化待參數實體的操做基類實例-》實例裏面構造方法

判斷是否存在存在該隊列,若是存在就返回該隊列實例,若是不存在,就建立隊列,返回實例。

     基礎操做類:

        #region 構造函數

        /// <summary>
        /// 默認構造函數:不對外開放
        /// </summary>
        private MessageQueueBase() { }

        /// <summary>
        /// 構造函數
        /// </summary>
        /// <param name="setting">消息隊列的配置</param>
        public MessageQueueBase(MessageQueueSettings setting)
        {
            try
            {
                this.msetting = setting;

                // 校驗是否存在隊列
                if (msetting.RemotePath == "." && !Exists())
                    this.mqueue = Create();
                else
                    this.mqueue = new MessageQueue(msetting.MessageQueuePath, msetting.AccessMode);

                if (msetting.RemotePath == ".")
                {
                    // 設置是否通過身份驗證:默認爲否
                    this.mqueue.Authenticate = msetting.Authenticate;

                    // 是否啓用日誌隊列
                    this.mqueue.UseJournalQueue = msetting.UseJournalQueue;

                    // 最大日誌隊列長度:最大值爲uint.MaxValue
                    this.mqueue.MaximumJournalSize =
                        msetting.MaximumJournalSize > uint.MaxValue ?
                        uint.MaxValue :
                        msetting.MaximumJournalSize;

                    // 最大隊列長度:最大值爲uint.MaxValue
                    this.mqueue.MaximumQueueSize = msetting.MaximumQueueSize > uint.MaxValue ? uint.MaxValue : msetting.MaximumJournalSize;

                    // 隊列標籤
                    this.mqueue.Label = msetting.MessageQueueLabel;
                }

                // 設置基優先級,默認是0
                this.mqueue.BasePriority = msetting.BasePriority;

                // 獲取消息隊列的id
                msetting.MessageQueueId = mqueue.Id;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

       ~MessageQueueBase()
        {
            this.mqueue?.Close();
            GC.SuppressFinalize(this);
        }
        #endregion

        #region 操做方法

        /// <summary>
        /// 驗證隊列是否存在
        /// </summary>        
        /// <returns>驗證結果</returns>
        public bool Exists()
        {
            var successed = false;
            try
            {
                successed = MessageQueue.Exists(msetting.MessageQueuePath);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return successed;
        }

        /// <summary>
        /// 建立隊列
        /// </summary>
        /// <param name="type">隊列類型(默認是專用隊列)</param>
        /// <returns>隊列對象</returns>
        public MessageQueue Create()
        {
            MessageQueue queue = default(MessageQueue);
            try
            {
                queue = MessageQueue.Create(msetting.MessageQueuePath, msetting.IsUseTransaction);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return queue;
        }

        /// <summary>
        /// 刪除隊列
        /// </summary>
        public void Delete()
        {
            try
            {
                MessageQueue.Delete(msetting.MessageQueuePath);
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        #endregion
View Code

    業務調用類:

        /// <summary>
        /// 建立消息隊列
        /// </summary>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="transaction">是不是事務隊列</param>
        /// <returns></returns>
        public bool CreateMessageQueue(string queueName, bool transaction)
        {
            bool result = false;
            MessageQueueSettings mqs = new MessageQueueSettings()
            {
                 MessageQueueName= queueName,
                 IsUseTransaction= transaction
            };
            IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs);
            result=messageQueueBase.Exists();
            return result;
        }
View Code

   測試結果:

     

     

  • 發送消息

      發送消息分爲單條發送和列表發送,是否啓用事務,以當前需求和隊列屬性決定。

      基礎操做類:

#region 發送操做
        /// <summary>
        /// 發送數據到隊列
        /// </summary>
        /// <typeparam name="T">發送的數據類型</typeparam>
        /// <param name="t">發送的數據對象</param>
        public void Send<T>(T t)
        {
            MessageQueueTransaction tran = null;
            try
            {
                tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
                using (Message message = new Message())
                {
                    tran?.Begin();
                    message.Body = t;
                    message.Label = $"推送時間[{DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss.fff")}]";
                    message.UseDeadLetterQueue = true;
                    message.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                    message.TimeToBeReceived = new TimeSpan(30, 0, 0, 0);
                    message.TimeToReachQueue = new TimeSpan(30, 0, 0, 0);
                    if (tran != null)
                    {
                        mqueue.Send(message, tran);
                    }
                    else
                    {
                        mqueue.Send(message);
                    }
                    tran?.Commit();
                    tran?.Dispose();
                }
            }
            catch (Exception ex)
            {
                tran?.Abort();
                tran?.Dispose();
                throw ex;
            }
        }

        /// <summary>
        /// 發送多條數據
        /// </summary>
        /// <typeparam name="T">發送的數據類型</typeparam>
        /// <param name="ts">發送的數據對象集合</param>
        public void SendList<T>(IList<T> ts)
        {
            MessageQueueTransaction tran = null;
            try
            {
                tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
                tran?.Begin();
                foreach (var item in ts)
                {
                    using (Message message = new Message())
                    {
                        message.Body = item;
                        message.Label = $"推送時間[{DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss.fff")}]";
                        message.UseDeadLetterQueue = true;
                        message.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                        message.TimeToBeReceived = new TimeSpan(30, 0, 0, 0);
                        message.TimeToReachQueue = new TimeSpan(30, 0, 0, 0);
                        if (tran != null)
                        {
                            mqueue.Send(message, tran);
                        }
                        else
                        {
                            mqueue.Send(message);
                        }
                    }
                }
                tran?.Commit();
                tran?.Dispose();
            }
            catch (Exception ex)
            {
                tran?.Abort();
                tran?.Dispose();
                throw ex;
            }
        }

        #endregion
View Code

      業務調用類:

  /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="priority">優先級</param>
        /// <param name="isTransaction">是否啓用隊列事務</param>
        /// <param name="userInfo">新增用戶信息</param>
        public void SendMessage(string queueName, short priority, bool isTransaction, user userInfo)
        {
            MessageQueueSettings mqs = new MessageQueueSettings()
            {
                MessageQueueName = queueName,
                BasePriority = priority,
                IsUseTransaction = isTransaction
            };
            IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs);
            messageQueueBase.Send<user>(userInfo);
        }
View Code

      測試結果:

      

      

  • 接收消息

       消息接收分爲同步接收和異步接收。

       基礎操做類:

#region 接收方法

        /// <summary>
        /// 同步獲取隊列消息
        /// </summary>
        /// <typeparam name="T">返回的數據類型</typeparam>
        /// <param name="action">獲取數據後的回調</param>
        public void Receive<T>(Action<T> action) where T : class
        {
            try
            {
                mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                Message message = mqueue.Receive();
                T obj = message.Body as T;
                if (obj != null)
                    action(obj);
                else
                    throw new InvalidCastException("隊列獲取的類型與泛型類型不符");
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        /// <summary>
        /// 同步查詢但不移除隊列第一條數據
        /// </summary>
        /// <typeparam name="T">返回的數據類型</typeparam>
        /// <param name="action">獲取數據後的回調</param>
        public void Peek<T>(Action<T> action) where T : class
        {
            try
            {
                mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                Message message = mqueue.Peek();
                T obj = message.Body as T;
                if (obj != null)
                    action(obj);
                else
                    throw new InvalidCastException("隊列獲取的類型與泛型類型不符");
            }
            catch (Exception ex)
            {

                throw ex;
            }
        }

        /// <summary>
        /// 獲取全部消息
        /// </summary>
        /// <typeparam name="T">數據類型</typeparam>
        /// <returns>取出來的數據</returns>
        public IList<T> GetAll<T>() where T : class
        {
            try
            {
                mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                var enumerator = mqueue.GetMessageEnumerator2();
                var ret = new List<T>();
                while (enumerator.MoveNext())
                {
                    var messageInfo = enumerator.Current.Body as T;
                    if (messageInfo != null)
                    {
                        ret.Add(messageInfo);
                    }
                    enumerator.RemoveCurrent();
                    enumerator.Reset();
                }
                return ret;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        /// <summary>
        /// 異步接收隊列消息,並刪除接收的消息數據
        /// </summary>
        /// <typeparam name="T">返回的數據類型</typeparam>
        /// <param name="action">獲取數據後的回調</param>
        public void ReceiveAsync<T>(Action<T> action) where T : class
        {
            MessageQueueTransaction tran = null;
            try
            {
                tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;

                if (!initReceiveAsync)
                {
                    mqueue.ReceiveCompleted += (sender, e) =>
                    {
                        Message message = mqueue.EndReceive(e.AsyncResult);
                        T obj = message.Body as T;
                        if (obj != null)
                            action(obj);
                        else
                            throw new InvalidCastException("隊列獲取的類型與泛型類型不符");
                    };
                    initReceiveAsync = true;
                }
                mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                tran?.Begin();
                mqueue.BeginReceive();
                tran?.Commit();
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        /// <summary>
        /// 啓動一個沒有超時設定的異步查看操做。直到隊列中出現消息時,才完成此操做。
        /// </summary>
        /// <typeparam name="T">獲取的消息數據的類型</typeparam>
        /// <param name="action">獲取數據後的回調</param>
        public void PeekAsync<T>(Action<T> action) where T : class
        {
            MessageQueueTransaction tran = null;
            try
            {
                tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
                if (!initPeekAsync)
                {
                    mqueue.PeekCompleted += (sender, e) =>
                    {
                        Message message = mqueue.EndPeek(e.AsyncResult);
                        T obj = message.Body as T;
                        if (obj != null)
                            action(obj);
                        else
                            throw new InvalidCastException("隊列獲取的類型與泛型類型不符");
                    };
                    initPeekAsync = true;
                }
                mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                tran?.Begin();
                mqueue.BeginPeek();
                tran?.Commit();
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        /// <summary>
        /// 異步循環獲取隊列消息,調用此方法將一直接收隊列數據,直到IsStopReceiveLoop被設置爲true
        /// </summary>
        /// <typeparam name="T">返回的數據類型</typeparam>
        /// <param name="action">獲取數據後的回調</param>
        public void ReceiveAsyncLoop<T>(Action<T> action) where T : class
        {
            MessageQueueTransaction tran = null;
            try
            {
                tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
                if (!initReceiveAsync)
                {
                    mqueue.ReceiveCompleted += (sender, e) =>
                    {
                        Message message = mqueue.EndReceive(e.AsyncResult);
                        T obj = message.Body as T;
                        if (obj != null)
                            action(obj);
                        else
                            throw new InvalidCastException("隊列獲取的類型與泛型類型不符");

                        //只要不中止就一直接收
                        if (!IsStopReceiveLoop)
                        {
                            tran?.Begin();
                            mqueue.BeginReceive();
                            tran?.Commit();
                        }
                    };
                    initReceiveAsync = true;
                }
                mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                IsStopReceiveLoop = false;
                tran?.Begin();
                mqueue.BeginReceive();
                tran?.Commit();
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        #endregion
View Code

      業務調用類:

 /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="queueName">隊列</param>
        /// <param name="isAsy">是否異步</param>
        public user ReceiveMessage(string queueName, bool isAsy)
        {
            user userInfo = new user();
            MessageQueueSettings mqs = new MessageQueueSettings()
            {
                MessageQueueName = queueName
            };
            IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs);
            if (!isAsy)
            {
                messageQueueBase.Receive<user>(p =>
                {
                    userInfo = p;
                });
            }
            else
            {
                messageQueueBase.ReceiveAsync<user>(p =>
                {
                    userInfo = p;
                });
            }
            return userInfo;
        }

        /// <summary>
        /// 獲取所有
        /// </summary>
        /// <param name="queueName"></param>
        /// <returns></returns>
        public IList<user> GetAll(string queueName)
        {
            MessageQueueSettings mqs = new MessageQueueSettings()
            {
                MessageQueueName = queueName
            };
            IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs);
            return messageQueueBase.GetAll<user>();
        }
View Code

      測試結果:

      點擊插入數據,在隊列中插入N條數據。同步接收和異步接收每次只獲取隊列第一條數據;異步循環是每次獲取一條數據,循環獲取,知道隊列沒有數據;

接收所有是一次性獲取指定隊列全部數據。

     

     

補充

        下一篇寫Dapper應用或Redis應用。

相關文章
相關標籤/搜索