基於RabbitMQ.Client組件實現RabbitMQ可複用的 ConnectionPool(鏈接池)

1、本文產生起因:  html

 以前文章《總結消息隊列RabbitMQ的基本用法》已對RabbitMQ的安裝、用法都作了詳細說明,而本文主要是針對在高併發且單次從RabbitMQ中消費消息時,出現了鏈接數不足、鏈接響應較慢、RabbitMQ服務器崩潰等各類性能問題的解方案,之因此會出現我列舉的這些問題,究基根源,實際上是TCP鏈接建立與斷開太過頻繁所致,這與咱們使用ADO.NET來訪問常規的關係型DB(如:SQL SERVER、MYSQL)有所不一樣,在訪問DB時,咱們通常都建議你們使用using包裹,目的是每次建立完DB鏈接,使用完成後自動釋放鏈接,避免沒必要要的鏈接數及資源佔用。可能有人會問,爲什麼訪問DB,能夠每次建立再斷開鏈接,都沒有問題,而一樣訪問MQ(本文所指的MQ均是RabbitMQ),每次建立再斷開鏈接,若是在高併發且建立與斷開頻率高的時候,會出現性能問題呢?其實若是瞭解了DB的鏈接建立與斷開以及MQ的鏈接建立與斷開原理就知道其中的區別了。這裏我簡要說明一下,DB鏈接與MQ鏈接 其實底層都是基於TCP鏈接,建立TCP鏈接確定是有資源消耗的,是很是昂貴的,原則上儘量少的去建立與斷開TCP鏈接,DB建立鏈接、MQ建立鏈接能夠說是同樣的,但在斷開銷燬鏈接上就有很大的不一樣,DB建立鏈接再斷開時,默認狀況下是把該鏈接回收到鏈接池中,下次若是再有DB鏈接建立請求,則先判斷DB鏈接池中是否有空閒的鏈接,如有則直接複用,若沒有才建立鏈接,這樣就達到了TCP鏈接的複用,而MQ建立鏈接都是新建立的TCP鏈接,斷開時則直接斷開TCP鏈接,簡單粗暴,看似資源清理更完全,但若在高併發高頻率每次都從新建立與斷開MQ鏈接,則性能只會愈來愈差(上面說過TCP鏈接是很是昂貴的),我在公司項目中就出現了該問題,後面在技術總監的指導下,對MQ的鏈接建立與斷開做了優化,實現了相似DB鏈接池的概念。java

鏈接池,故名思義,鏈接的池子,全部的鏈接做爲一種資源集中存放在池中,須要使用時就能夠到池中獲取空閒鏈接資源,用完後再放回池中,以此達到鏈接資源的有效重用,同時也控制了資源的過分消耗與浪費(資源多少取決於池子的容量)緩存

2、源代碼奉獻(可直接複製應用到你們的項目中) 安全

下面就先貼出實現MQHelper(含鏈接池)的源代碼:服務器

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Util;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Web.Caching;
using System.Web;
using System.Configuration;
using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Runtime.CompilerServices;

namespace Zuowj.Core
{
    public class MQHelper
    {
        private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";
        private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";

        private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;//空閒鏈接對象隊列
        private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;//使用中(忙)鏈接對象集合
        private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;//鏈接池使用率
        private readonly static Semaphore MQConnectionPoolSemaphore;
        private readonly static object freeConnLock = new object(), addConnLock = new object();
        private static int connCount = 0;

        public const int DefaultMaxConnectionCount = 30;//默認最大保持可用鏈接數
        public const int DefaultMaxConnectionUsingCount = 10000;//默認最大鏈接可訪問次數


        private static int MaxConnectionCount
        {
            get
            {
                if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null)
                {
                    return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]);
                }
                else
                {
                    int mqMaxConnectionCount = 0;
                    string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount];
                    if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0)
                    {
                        mqMaxConnectionCount = DefaultMaxConnectionCount;
                    }

                    string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
                    HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));

                    return mqMaxConnectionCount;
                }

            }
        }

        /// <summary>
        /// 創建鏈接
        /// </summary>
        /// <param name="hostName">服務器地址</param>
        /// <param name="userName">登陸帳號</param>
        /// <param name="passWord">登陸密碼</param>
        /// <returns></returns>
        private static ConnectionFactory CrateFactory()
        {
            var mqConnectionSetting = GetMQConnectionSetting();
            var connectionfactory = new ConnectionFactory();
            connectionfactory.HostName = mqConnectionSetting[0];
            connectionfactory.UserName = mqConnectionSetting[1];
            connectionfactory.Password = mqConnectionSetting[2];
            if (mqConnectionSetting.Length > 3) //增長端口號
            {
                connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]);
            }
            return connectionfactory;
        }

        private static string[] GetMQConnectionSetting()
        {
            string[] mqConnectionSetting = null;
            if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null)
            {
                //MQConnectionSetting=Host IP|;userid;|;password
                string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting];
                if (!string.IsNullOrWhiteSpace(mqConnSettingStr))
                {
                    mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);//解密MQ鏈接字符串,若項目中無此需求可移除,EncryptUtility是一個AES的加解密工具類,你們網上可自行查找
                    if (mqConnSettingStr.Contains(";|;"))
                    {
                        mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries);
                    }
                }

                if (mqConnectionSetting == null || mqConnectionSetting.Length < 3)
                {
                    throw new Exception("MQConnectionSetting未配置或配置不正確");
                }

                string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
                HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath));
            }
            else
            {
                mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[];
            }

            return mqConnectionSetting;
        }




        public static IConnection CreateMQConnection()
        {
            var factory = CrateFactory();
            factory.AutomaticRecoveryEnabled = true;//自動重連
            var connection = factory.CreateConnection();
            connection.AutoClose = false;
            return connection;
        }


        static MQHelper()
        {
            FreeConnectionQueue = new ConcurrentQueue<IConnection>();
            BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>();
            MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//鏈接池使用率
            MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信號量,控制同時併發可用線程數

        }

        public static IConnection CreateMQConnectionInPoolNew()
        {

        SelectMQConnectionLine:

            MQConnectionPoolSemaphore.WaitOne();//當<MaxConnectionCount時,會直接進入,不然會等待直到空閒鏈接出現

            IConnection mqConnection = null;
            if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//若是已有鏈接數小於最大可用鏈接數,則直接建立新鏈接
            {
                lock (addConnLock)
                {
                    if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)
                    {
                        mqConnection = CreateMQConnection();
                        BusyConnectionDic[mqConnection] = true;//加入到忙鏈接集合中
                        MQConnectionPoolUsingDicNew[mqConnection] = 1;
                        //  BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                        return mqConnection;
                    }
                }
            }


            if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //若是沒有可用空閒鏈接,則從新進入等待排隊
            {
                // BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
                goto SelectMQConnectionLine;
            }
            else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //若是取到空閒鏈接,判斷是否使用次數是否超過最大限制,超過則釋放鏈接並從新建立
            {
                mqConnection.Close();
                mqConnection.Dispose();
                // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);

                mqConnection = CreateMQConnection();
                MQConnectionPoolUsingDicNew[mqConnection] = 0;
                // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
            }

            BusyConnectionDic[mqConnection] = true;//加入到忙鏈接集合中
            MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次數加1

            // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);

            return mqConnection;
        }

        private static void ResetMQConnectionToFree(IConnection connection)
        {
            lock (freeConnLock)
            {
                bool result = false;
                if (BusyConnectionDic.TryRemove(connection, out result)) //從忙隊列中取出
                {
                    //  BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                }
                else
                {
                    // BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                }

                if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//若是由於高併發出現極少機率的>MaxConnectionCount,則直接釋放該鏈接
                {
                    connection.Close();
                    connection.Dispose();
                }
                else
                {
                    FreeConnectionQueue.Enqueue(connection);//加入到空閒隊列,以便持續提供鏈接服務
                }

                MQConnectionPoolSemaphore.Release();//釋放一個空閒鏈接信號

                //Interlocked.Decrement(ref connCount);
                //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount);
            }
        }


        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="connection">消息隊列鏈接對象</param>
        /// <typeparam name="T">消息類型</typeparam>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="msg">消息</param>
        /// <returns></returns>
        public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true)
        {
            try
            {

                using (var channel = connection.CreateModel())//創建通信信道
                {
                    // 參數從前面開始分別意思爲:隊列名稱,是否持久化,獨佔的隊列,不使用時是否自動刪除,其餘參數
                    channel.QueueDeclare(queueName, durable, false, false, null);

                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//1表示不持久,2.表示持久化

                    if (!durable)
                        properties = null;

                    var body = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish("", queueName, properties, body);
                }


                return string.Empty;
            }
            catch (Exception ex)
            {
                return ex.ToString();
            }
            finally
            {
                ResetMQConnectionToFree(connection);
            }
        }

        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="connection">消息隊列鏈接對象</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="dealMessage">消息處理函數</param>
        /// <param name="saveLog">保存日誌方法,可選</param>
        public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null)
        {
            try
            {

                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queueName, durable, false, false, null); //獲取隊列 
                    channel.BasicQos(0, 1, false); //分發機制爲觸發式

                    var consumer = new QueueingBasicConsumer(channel); //創建消費者
                    // 從左到右參數意思分別是:隊列名稱、是否讀取消息後直接刪除消息,消費者
                    channel.BasicConsume(queueName, false, consumer);

                    while (true)  //若是隊列中有消息
                    {
                        ConsumeAction consumeResult = ConsumeAction.RETRY;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息
                        string message = null;

                        try
                        {
                            var body = ea.Body;
                            message = Encoding.UTF8.GetString(body);
                            consumeResult = dealMessage(message);
                        }
                        catch (Exception ex)
                        {
                            if (saveLog != null)
                            {
                                saveLog(message, ex);
                            }
                        }
                        if (consumeResult == ConsumeAction.ACCEPT)
                        {
                            channel.BasicAck(ea.DeliveryTag, false);  //消息從隊列中刪除
                        }
                        else if (consumeResult == ConsumeAction.RETRY)
                        {
                            channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列
                        }
                        else
                        {
                            channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄
                        }
                    }
                }

            }
            catch (Exception ex)
            {
                if (saveLog != null)
                {
                    saveLog("QueueName:" + queueName, ex);
                }

                throw ex;
            }
            finally
            {
                ResetMQConnectionToFree(connection);
            }
        }


        /// <summary>
        /// 依次獲取單個消息
        /// </summary>
        /// <param name="connection">消息隊列鏈接對象</param>
        /// <param name="QueueName">隊列名稱</param>
        /// <param name="durable">持久化</param>
        /// <param name="dealMessage">處理消息委託</param>
        public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage)
        {
            try
            {

                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, durable, false, false, null); //獲取隊列 
                    channel.BasicQos(0, 1, false); //分發機制爲觸發式

                    uint msgCount = channel.MessageCount(QueueName);

                    if (msgCount > 0)
                    {
                        var consumer = new QueueingBasicConsumer(channel); //創建消費者
                        // 從左到右參數意思分別是:隊列名稱、是否讀取消息後直接刪除消息,消費者
                        channel.BasicConsume(QueueName, false, consumer);

                        ConsumeAction consumeResult = ConsumeAction.RETRY;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息
                        try
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            consumeResult = dealMessage(message);
                        }
                        catch (Exception ex)
                        {
                            throw ex;
                        }
                        finally
                        {
                            if (consumeResult == ConsumeAction.ACCEPT)
                            {
                                channel.BasicAck(ea.DeliveryTag, false);  //消息從隊列中刪除
                            }
                            else if (consumeResult == ConsumeAction.RETRY)
                            {
                                channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列
                            }
                            else
                            {
                                channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄
                            }
                        }
                    }
                    else
                    {
                        dealMessage(string.Empty);
                    }
                }

            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                ResetMQConnectionToFree(connection);
            }
        }


        /// <summary>
        /// 獲取隊列消息數
        /// </summary>
        /// <param name="connection"></param>
        /// <param name="QueueName"></param>
        /// <returns></returns>
        public static int GetMessageCount(IConnection connection, string QueueName)
        {
            int msgCount = 0;
            try
            {

                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, true, false, false, null); //獲取隊列 
                    msgCount = (int)channel.MessageCount(QueueName);
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                ResetMQConnectionToFree(connection);
            }

            return msgCount;
        }


    }

    public enum ConsumeAction
    {
        ACCEPT,  // 消費成功
        RETRY,   // 消費失敗,能夠放回隊列從新消費
        REJECT,  // 消費失敗,直接丟棄
    }
}

如今對上述代碼的核心點做一個簡要的說明:併發

先說一下靜態構造函數:app

FreeConnectionQueue 用於存放空閒鏈接對象隊列,爲什麼使用Queue,由於當我從中取出1個空閒鏈接後,空閒鏈接數就應該少1個,這個Queue很好知足這個需求,並且這個Queue是併發安全的Queue哦(ConcurrentQueue)函數

BusyConnectionDic 忙(使用中)鏈接對象集合,爲什麼這裏使用字典對象呢,由於當我用完後,須要可以快速的找出使用中的鏈接對象,並能快速移出,同時從新放入到空閒隊列FreeConnectionQueue ,達到鏈接複用高併發

MQConnectionPoolUsingDicNew 鏈接使用次數記錄集合,這個只是輔助記錄鏈接使用次數,以即可以計算一個鏈接的已使用次數,當達到最大使用次數時,則應斷開從新建立工具

MQConnectionPoolSemaphore 這個是信號量,這是控制併發鏈接的重要手段,鏈接池的容量等同於這個信號量的最大可並行數,保證同時使用的鏈接數不超過鏈接池的容量,若超過則會等待;

具體步驟說明:

1.MaxConnectionCount:最大保持可用鏈接數(能夠理解爲鏈接池的容量),能夠經過CONFIG配置,默認爲30; 

2.DefaultMaxConnectionUsingCount:默認最大鏈接可訪問次數,我這裏沒有使用配置,而是直接使用常量固定爲1000,你們如有須要能夠改爲從CONFIG配置,參考MaxConnectionCount的屬性設置(採起了依賴緩存)

3.CreateMQConnectionInPoolNew:從鏈接池中建立MQ鏈接對象,這個是核心方法,是實現鏈接池的地方,代碼中已註釋了重要的步驟邏輯,這裏說一下實現思路:

  3.1 經過MQConnectionPoolSemaphore.WaitOne() 利用信號量的並行等待方法,若是當前併發超過信號量的最大並行度(也就是做爲鏈接池的最大容量),則須要等待空閒鏈接池,防止鏈接數超過池的容量,若是併發沒有超過池的容量,則能夠進入獲取鏈接的邏輯;

  3.2FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount,若是空閒鏈接隊列+忙鏈接集合的總數小於鏈接池的容量,則能夠直接建立新的MQ鏈接,不然FreeConnectionQueue.TryDequeue(out mqConnection) 嘗試從空閒鏈接隊列中獲取一個可用的空閒鏈接使用,若空閒鏈接都沒有,則須要返回到方法首行,從新等待空閒鏈接;

  3.3MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen 若是取到空閒鏈接,則先判斷使用次數是否超過最大限制,超過則釋放鏈接或空閒鏈接已斷開鏈接也須要從新建立,不然該鏈接可用;

  3.4BusyConnectionDic[mqConnection] = true;加入到忙鏈接集合中,MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1; 使用次數加1,確保每使用一次鏈接,鏈接次數能記錄

4.ResetMQConnectionToFree:重置釋放鏈接對象,這個是保證MQ鏈接用完後可以回收到空閒鏈接隊列中(即:回到鏈接池中),而不是直接斷開鏈接,這個方法很簡單就不做做過多說明。

 好了,都說明了如何實現含鏈接池的MQHelper,如今再來舉幾個例子來講明如何用:

3、實際應用(簡單易上手)

獲取並消費一個消息:

        public string GetMessage(string queueName)
        {
            string message = null;
            try
            {
                var connection = MQHelper.CreateMQConnectionInPoolNew();

                MQHelper.ConsumeMsgSingle(connection, queueName, true, (msg) =>
                {
                    message = msg;
                    return ConsumeAction.ACCEPT;
                });
            }
            catch (Exception ex)
            {
                BaseUtil.Logger.Error(string.Format("MQHelper.ConsumeMsgSingle Error:{0}", ex.Message), ex);
                message = "ERROR:" + ex.Message;
            }

            //BaseUtil.Logger.InfoFormat("第{0}次請求,從消息隊列(隊列名稱:{1})中獲取消息值爲:{2}", Interlocked.Increment(ref requestCount), queueName, message);


            return message;


        }

 發送一個消息:

        public string SendMessage(string queueName, string msg)
        {
            string result = null;
            try
            {
                var connection = MQHelper.CreateMQConnectionInPoolNew();

                result = MQHelper.SendMsg(connection, queueName, msg);
            }
            catch (Exception ex)
            {
                BaseUtil.Logger.Error(string.Format("MQHelper.SendMessage Error:{0}", ex.Message), ex);
                result = ex.Message;
            }

            return result;
        }

 獲取消息隊列消息數:

        public int GetMessageCount(string queueName)
        {
            int result = -1;
            try
            {
                var connection = MQHelper.CreateMQConnectionInPoolNew();

                result = MQHelper.GetMessageCount(connection, queueName);
            }
            catch (Exception ex)
            {
                BaseUtil.Logger.Error(string.Format("MQHelper.GetMessageCount Error:{0}", ex.Message), ex);
                result = -1;
            }

            return result;
        }

 這裏說一下:BaseUtil.Logger 是Log4Net的實例對象,另外上面沒有針對持續訂閱消費消息(ConsumeMsg)做說明,由於這個其實能夠不用鏈接池也不會有問題,由於它是一個持久訂閱並持久消費的過程,不會出現頻繁建立鏈接對象的狀況。

 最後要說的是,雖然說代碼貼出來,你們一看就以爲很簡單,好像沒有什麼技術含量,但若是沒有完整的思路也仍是須要花費一些時間和精力的,代碼中核心是如何簡單高效的解決併發及鏈接複用的的問題,該MQHelper有通過壓力測試並順利在我司項目中使用,完美解決了以前的問題,因爲這個方案是我在公司通宵實現的,可能有一些方面的不足,你們能夠相互交流或完善後入到本身的項目中。

 2019-7-3更新:優化解決當已緩存的鏈接不可用時,致使沒法複用,鏈接池一直被無效的長鏈接佔滿問題,以及處理消息時增長失敗自動重試功能,代碼以下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Util;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Web.Caching;
using System.Web;
using System.Configuration;
using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Runtime.CompilerServices;
using System.Net.Sockets;

namespace KYLDMQService.Core
{
    public class MQHelper
    {
        private const string CacheKey_MQConnectionSetting = "MQConnectionSetting";
        private const string CacheKey_MQMaxConnectionCount = "MQMaxConnectionCount";


        public const int DefaultMaxConnectionCount = 30;//默認最大保持可用鏈接數
        public const int DefaultMaxConnectionUsingCount = 10000;//默認最大鏈接可訪問次數
        public const int DefaultReTryConnectionCount = 1;//默認重試鏈接次數

        private static int MaxConnectionCount
        {
            get
            {
                if (HttpRuntime.Cache[CacheKey_MQMaxConnectionCount] != null)
                {
                    return Convert.ToInt32(HttpRuntime.Cache[CacheKey_MQMaxConnectionCount]);
                }
                else
                {
                    int mqMaxConnectionCount = 0;
                    string mqMaxConnectionCountStr = ConfigurationManager.AppSettings[CacheKey_MQMaxConnectionCount];
                    if (!int.TryParse(mqMaxConnectionCountStr, out mqMaxConnectionCount) || mqMaxConnectionCount <= 0)
                    {
                        mqMaxConnectionCount = DefaultMaxConnectionCount;
                    }

                    string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
                    HttpRuntime.Cache.Insert(CacheKey_MQMaxConnectionCount, mqMaxConnectionCount, new CacheDependency(appConfigPath));

                    return mqMaxConnectionCount;
                }

            }
        }

        /// <summary>
        /// 創建鏈接
        /// </summary>
        /// <param name="hostName">服務器地址</param>
        /// <param name="userName">登陸帳號</param>
        /// <param name="passWord">登陸密碼</param>
        /// <returns></returns>
        private static ConnectionFactory CrateFactory()
        {
            var mqConnectionSetting = GetMQConnectionSetting();
            var connectionfactory = new ConnectionFactory();
            connectionfactory.HostName = mqConnectionSetting[0];
            connectionfactory.UserName = mqConnectionSetting[1];
            connectionfactory.Password = mqConnectionSetting[2];
            if (mqConnectionSetting.Length > 3) //增長端口號
            {
                connectionfactory.Port = Convert.ToInt32(mqConnectionSetting[3]);
            }
            return connectionfactory;
        }

        private static string[] GetMQConnectionSetting()
        {
            string[] mqConnectionSetting = null;
            if (HttpRuntime.Cache[CacheKey_MQConnectionSetting] == null)
            {
                //MQConnectionSetting=Host IP|;userid;|;password
                string mqConnSettingStr = ConfigurationManager.AppSettings[CacheKey_MQConnectionSetting];
                if (!string.IsNullOrWhiteSpace(mqConnSettingStr))
                {
                    mqConnSettingStr = EncryptUtility.Decrypt(mqConnSettingStr);
                    if (mqConnSettingStr.Contains(";|;"))
                    {
                        mqConnectionSetting = mqConnSettingStr.Split(new[] { ";|;" }, StringSplitOptions.RemoveEmptyEntries);
                    }
                }

                if (mqConnectionSetting == null || mqConnectionSetting.Length < 3)
                {
                    throw new Exception("MQConnectionSetting未配置或配置不正確");
                }

                string appConfigPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "App.config");
                HttpRuntime.Cache.Insert(CacheKey_MQConnectionSetting, mqConnectionSetting, new CacheDependency(appConfigPath));
            }
            else
            {
                mqConnectionSetting = HttpRuntime.Cache[CacheKey_MQConnectionSetting] as string[];
            }

            return mqConnectionSetting;
        }




        public static IConnection CreateMQConnection()
        {
            var factory = CrateFactory();
            factory.AutomaticRecoveryEnabled = true;//自動重連
            var connection = factory.CreateConnection();
            connection.AutoClose = false;
            return connection;
        }

 


        private readonly static ConcurrentQueue<IConnection> FreeConnectionQueue;//空閒鏈接對象隊列
        private readonly static ConcurrentDictionary<IConnection, bool> BusyConnectionDic;//使用中(忙)鏈接對象集合
        private readonly static ConcurrentDictionary<IConnection, int> MQConnectionPoolUsingDicNew;//鏈接池使用率
        private readonly static Semaphore MQConnectionPoolSemaphore;
        private readonly static object freeConnLock = new object(), addConnLock = new object();
        private static int connCount = 0;
        static MQHelper()
        {
            FreeConnectionQueue = new ConcurrentQueue<IConnection>();
            BusyConnectionDic = new ConcurrentDictionary<IConnection, bool>();
            MQConnectionPoolUsingDicNew = new ConcurrentDictionary<IConnection, int>();//鏈接池使用率
            MQConnectionPoolSemaphore = new Semaphore(MaxConnectionCount, MaxConnectionCount, "MQConnectionPoolSemaphore");//信號量,控制同時併發可用線程數

        }

        public static IConnection CreateMQConnectionInPoolNew()
        {

            MQConnectionPoolSemaphore.WaitOne(10000);//當<MaxConnectionCount時,會直接進入,不然會等待直到空閒鏈接出現
            //Interlocked.Increment(ref connCount);
            //BaseUtil.Logger.DebugFormat("thread Concurrent count:{0}", connCount);
            //int totalCount = FreeConnectionQueue.Count + BusyConnectionDic.Count;
            //BaseUtil.Logger.DebugFormat("totalCount:{0}", totalCount);
            //if (totalCount > MaxConnectionCount)
            //{
            //    System.Diagnostics.Debug.WriteLine("ConnectionCount:" + totalCount);
            //    BaseUtil.Logger.DebugFormat("more than totalCount:{0}",totalCount);
            //}
            IConnection mqConnection = null;

            try
            {
                if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)//若是已有鏈接數小於最大可用鏈接數,則直接建立新鏈接
                {
                    lock (addConnLock)
                    {
                        if (FreeConnectionQueue.Count + BusyConnectionDic.Count < MaxConnectionCount)
                        {
                            mqConnection = CreateMQConnection();
                            BusyConnectionDic[mqConnection] = true;//加入到忙鏈接集合中
                            MQConnectionPoolUsingDicNew[mqConnection] = 1;
                            //  BaseUtil.Logger.DebugFormat("Create a MQConnection:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                            return mqConnection;
                        }
                    }
                }


                if (!FreeConnectionQueue.TryDequeue(out mqConnection)) //若是沒有可用空閒鏈接,則從新進入等待排隊
                {
                    // BaseUtil.Logger.DebugFormat("no FreeConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
                    return CreateMQConnectionInPoolNew();
                }
                else if (MQConnectionPoolUsingDicNew[mqConnection] + 1 > DefaultMaxConnectionUsingCount || !mqConnection.IsOpen) //若是取到空閒鏈接,判斷是否使用次數是否超過最大限制,超過則釋放鏈接並從新建立
                {
                    if (mqConnection.IsOpen)
                    {
                        mqConnection.Close();
                    }

                    mqConnection.Dispose();

                    // BaseUtil.Logger.DebugFormat("close > DefaultMaxConnectionUsingCount mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);

                    mqConnection = CreateMQConnection();
                    MQConnectionPoolUsingDicNew[mqConnection] = 0;
                    // BaseUtil.Logger.DebugFormat("create new mqConnection,FreeConnectionCount:{0}, BusyConnectionCount:{1}", FreeConnectionQueue.Count, BusyConnectionDic.Count);
                }

                BusyConnectionDic[mqConnection] = true;//加入到忙鏈接集合中
                MQConnectionPoolUsingDicNew[mqConnection] = MQConnectionPoolUsingDicNew[mqConnection] + 1;//使用次數加1

                // BaseUtil.Logger.DebugFormat("set BusyConnectionDic:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", mqConnection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);

                return mqConnection;

            }
            catch //若是在建立鏈接發生錯誤,則判斷當前是否已得到Connection,若是得到則釋放鏈接,最終都會釋放鏈接池計數
            {
                if (mqConnection != null)
                {
                    ResetMQConnectionToFree(mqConnection);
                }
                else
                {
                    MQConnectionPoolSemaphore.Release();
                }

                throw;
            }
        }

        private static void ResetMQConnectionToFree(IConnection connection)
        {
            try
            {
                lock (freeConnLock)
                {
                    bool result = false;
                    if (BusyConnectionDic.TryRemove(connection, out result)) //從忙隊列中取出
                    {
                        //  BaseUtil.Logger.DebugFormat("set FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                    }
                    else//若極小機率移除失敗,則再重試一次
                    {
                        if (!BusyConnectionDic.TryRemove(connection, out result))
                        {
                            BaseUtil.Logger.DebugFormat("failed TryRemove BusyConnectionDic(2 times):{0},FreeConnectionCount:{1}, BusyConnectionCount:{2}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count);
                        }
                    }

                    if (FreeConnectionQueue.Count + BusyConnectionDic.Count > MaxConnectionCount)//若是由於高併發出現極少機率的>MaxConnectionCount,則直接釋放該鏈接
                    {
                        connection.Close();
                        connection.Dispose();
                    }
                    else if (connection.IsOpen)//若是是OPEN狀態才加入空閒隊列,不然直接丟棄
                    {
                        FreeConnectionQueue.Enqueue(connection);//加入到空閒隊列,以便持續提供鏈接服務
                    }

                }
            }
            catch
            {
                throw;
            }
            finally
            {
                MQConnectionPoolSemaphore.Release();//釋放一個空閒鏈接信號
            }

            //Interlocked.Decrement(ref connCount);
            //BaseUtil.Logger.DebugFormat("Enqueue FreeConnectionQueue:{0},FreeConnectionCount:{1}, BusyConnectionCount:{2},thread count:{3}", connection.GetHashCode().ToString(), FreeConnectionQueue.Count, BusyConnectionDic.Count,connCount);

        }


        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="connection">消息隊列鏈接對象</param>
        /// <typeparam name="T">消息類型</typeparam>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="msg">消息</param>
        /// <returns></returns>
        public static string SendMsg(IConnection connection, string queueName, string msg, bool durable = true)
        {
            bool reTry = false;
            int reTryCount = 0;
            string sendErrMsg = null;

            do
            {
                reTry = false;
                try
                {
                    using (var channel = connection.CreateModel())//創建通信信道
                    {
                        // 參數從前面開始分別意思爲:隊列名稱,是否持久化,獨佔的隊列,不使用時是否自動刪除,其餘參數
                        channel.QueueDeclare(queueName, durable, false, false, null);

                        var properties = channel.CreateBasicProperties();
                        properties.DeliveryMode = 2;//1表示不持久,2.表示持久化

                        if (!durable)
                            properties = null;

                        var body = Encoding.UTF8.GetBytes(msg);
                        channel.BasicPublish("", queueName, properties, body);
                    }

                    sendErrMsg = string.Empty;
                }
                catch (Exception ex)
                {
                    if (BaseUtil.IsIncludeException<SocketException>(ex))
                    {
                        if ((++reTryCount) <= DefaultReTryConnectionCount)//可重試1次
                        {
                            ResetMQConnectionToFree(connection);
                            connection = CreateMQConnectionInPoolNew();
                            reTry = true;
                        }
                    }

                    sendErrMsg = ex.ToString();
                }
                finally
                {
                    if (!reTry)
                    {
                        ResetMQConnectionToFree(connection);
                    }

                }

            } while (reTry);

            return sendErrMsg;

        }

        /// <summary>
        /// 消費消息
        /// </summary>
        /// <param name="connection">消息隊列鏈接對象</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="dealMessage">消息處理函數</param>
        /// <param name="saveLog">保存日誌方法,可選</param>
        public static void ConsumeMsg(IConnection connection, string queueName, bool durable, Func<string, ConsumeAction> dealMessage, Action<string, Exception> saveLog = null)
        {
            try
            {

                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queueName, durable, false, false, null); //獲取隊列 
                    channel.BasicQos(0, 1, false); //分發機制爲觸發式

                    var consumer = new QueueingBasicConsumer(channel); //創建消費者
                    // 從左到右參數意思分別是:隊列名稱、是否讀取消息後直接刪除消息,消費者
                    channel.BasicConsume(queueName, false, consumer);

                    while (true)  //若是隊列中有消息
                    {
                        ConsumeAction consumeResult = ConsumeAction.RETRY;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息
                        string message = null;

                        try
                        {
                            var body = ea.Body;
                            message = Encoding.UTF8.GetString(body);
                            consumeResult = dealMessage(message);
                        }
                        catch (Exception ex)
                        {
                            if (saveLog != null)
                            {
                                saveLog(message, ex);
                            }
                        }
                        if (consumeResult == ConsumeAction.ACCEPT)
                        {
                            channel.BasicAck(ea.DeliveryTag, false);  //消息從隊列中刪除
                        }
                        else if (consumeResult == ConsumeAction.RETRY)
                        {
                            channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列
                        }
                        else
                        {
                            channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄
                        }
                    }
                }

            }
            catch (Exception ex)
            {
                if (saveLog != null)
                {
                    saveLog("QueueName:" + queueName, ex);
                }

                throw ex;
            }
            finally
            {
                //MQConnectionPool[connection] = false;//改成空閒
                ResetMQConnectionToFree(connection);
            }
        }


        /// <summary>
        /// 依次獲取單個消息
        /// </summary>
        /// <param name="connection">消息隊列鏈接對象</param>
        /// <param name="QueueName">隊列名稱</param>
        /// <param name="durable">持久化</param>
        /// <param name="dealMessage">處理消息委託</param>
        public static void ConsumeMsgSingle(IConnection connection, string QueueName, bool durable, Func<string, ConsumeAction> dealMessage)
        {
            bool reTry = false;
            int reTryCount = 0;
            ConsumeAction consumeResult = ConsumeAction.RETRY;
            IModel channel = null;
            BasicDeliverEventArgs ea = null;
            do
            {
                reTry = false;
                try
                {
                    channel = connection.CreateModel();

                    channel.QueueDeclare(QueueName, durable, false, false, null); //獲取隊列 
                    channel.BasicQos(0, 1, false); //分發機制爲觸發式

                    uint msgCount = channel.MessageCount(QueueName);

                    if (msgCount > 0)
                    {
                        var consumer = new QueueingBasicConsumer(channel); //創建消費者
                        // 從左到右參數意思分別是:隊列名稱、是否讀取消息後直接刪除消息,消費者
                        channel.BasicConsume(QueueName, false, consumer);

                        ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //獲取消息

                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        consumeResult = dealMessage(message);

                    }
                    else
                    {
                        dealMessage(string.Empty);
                    }


                }
                catch (Exception ex)
                {

                    if (BaseUtil.IsIncludeException<SocketException>(ex))
                    {
                        if ((++reTryCount) <= DefaultReTryConnectionCount)//可重試1次
                        {
                            if (channel != null) channel.Dispose();

                            ResetMQConnectionToFree(connection);
                            connection = CreateMQConnectionInPoolNew();
                            reTry = true;
                        }
                    }

                    throw ex;
                }
                finally
                {
                    if (!reTry)
                    {
                        if (channel != null && ea != null)
                        {
                            if (consumeResult == ConsumeAction.ACCEPT)
                            {
                                channel.BasicAck(ea.DeliveryTag, false);  //消息從隊列中刪除
                            }
                            else if (consumeResult == ConsumeAction.RETRY)
                            {
                                channel.BasicNack(ea.DeliveryTag, false, true); //消息重回隊列
                            }
                            else
                            {
                                channel.BasicNack(ea.DeliveryTag, false, false); //消息直接丟棄
                            }
                        }

                        if (channel != null) channel.Dispose();

                        ResetMQConnectionToFree(connection);
                    }
                }

            } while (reTry);


        }


        /// <summary>
        /// 獲取隊列消息數
        /// </summary>
        /// <param name="connection"></param>
        /// <param name="QueueName"></param>
        /// <returns></returns>
        public static int GetMessageCount(IConnection connection, string QueueName)
        {
            int msgCount = 0;
            bool reTry = false;
            int reTryCount = 0;

            do
            {
                reTry = false;
                try
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(QueueName, true, false, false, null); //獲取隊列 
                        msgCount = (int)channel.MessageCount(QueueName);
                    }
                }
                catch (Exception ex)
                {
                    if (BaseUtil.IsIncludeException<SocketException>(ex))
                    {
                        if ((++reTryCount) <= DefaultReTryConnectionCount)//可重試1次
                        {
                            ResetMQConnectionToFree(connection);
                            connection = CreateMQConnectionInPoolNew();
                            reTry = true;
                        }
                    }

                    throw ex;
                }
                finally
                {
                    if (!reTry)
                    {
                        ResetMQConnectionToFree(connection);
                    }
                }

            } while (reTry);

            return msgCount;
        }


    }

    public enum ConsumeAction
    {
        ACCEPT,  // 消費成功
        RETRY,   // 消費失敗,能夠放回隊列從新消費
        REJECT,  // 消費失敗,直接丟棄
    }
}
相關文章
相關標籤/搜索