RabbitMQ C#客戶端自動重連

重要參考文章來源:http://gigi.nullneuron.net/gigilabs/resilient-connections-with-rabbitmq-net-client/服務器

參考代碼:https://bitbucket.org/dandago/gigilabs/src/bba0d457869f3283fa9f47a52e9bc009f29afc9d/ResilientRabbitMqConnectivity/?at=master網絡

 

緣由是這樣的,我在Windows客戶端有一個Windows後臺服務,負責與服務端的數據交互,數據上傳及數據下載架構

1.數據上傳部分是使用的rabbitmq donnet庫發送消息至RabbittMQ服務器,服務器另外有一個應用程序會監控RabbitMQ服務器的指定隊列,完成數據的上傳服務併發

 

2.數據下載部分是使用的rabbitmq donnet庫監控RabbitMQ服務器指定的隊列,服務器應用程序將數據發送到指定的RabbitMQ服務器的隊列中,客戶端就會能獲取到服務器應用發送過來的數據函數

 

這樣的架構仍是比較好用的,使用的效果目前還行,但遇到一個比較頭痛的問題,Windows後臺服務一直在Windows平板電腦上運行,除非手動安裝及更新應用時,纔會將Windows服務進行從新安裝或重啓,其餘的狀況是不會進行重啓的oop

瞭解到RabbitMQ是有自動重連的技術的,能夠參考地址:https://yq.aliyun.com/articles/369969spa

 

這個效果只做用於,服務器沒有掛掉,只是中間有一些網絡問題時才能夠進行重連.net

但有一種狀況是沒有處理到的,咱們已經在客戶端對RabbitMQ某個隊列進行監控,但服務器忽然掛掉,而後幾分鐘後從新啓動了,這時,客戶端能夠從新創建鏈接,但卻不會自動對隊列產生監控,沒法拿到消息線程

現時對代碼作出一些處理code

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

using System.Threading;
using RabbitMQ.Client.Events;
using System.IO;

namespace MES_MonitoringService.Common
{
    public class RabbitMQClientHandler
    {
        private static string defaultRabbitMQHostName = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerHostName");
        private static string defaultRabbitMQPort = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerPort");
        private static string defaultRabbitMQUserName = Common.ConfigFileHandler.GetAppConfig("RabbitMQUserName");
        private static string defaultRabbitMQPassword = Common.ConfigFileHandler.GetAppConfig("RabbitMQPassword");
        private static string defaultRabbitVirtualHost = Common.ConfigFileHandler.GetAppConfig("RabbitMQVirtualHost");

        // 定義一個靜態變量來保存類的實例
        private static RabbitMQClientHandler uniqueInstance;
        //定義一個標識確保線程同步 
        private static readonly object locker = new object();

        /*-------------------------------------------------------------------------------------*/

        //ConnectionFactory
        private static ConnectionFactory mc_ConnectionFactory = null;
        //Connection
        private static IConnection Connection;

        //發送頻道及接收頻道分開,避免互相影響,致使整個服務不可用
        //Send Channel
        private static IModel SendChannel;
        //Listen Channel
        private static IModel ListenChannel;

        //數據監控隊列
        private static string MC_SyncDataConsume = string.Empty;

        //
        //private SyncDataHandler syncDataHandlerClass;

        /*-------------------------------------------------------------------------------------*/

        /// <summary>
        /// 定義私有構造函數,使外界不能建立該類實例
        /// </summary>
        public RabbitMQClientHandler()
        {
            Reconnect();            
        }

        /// <summary>
        /// 定義公有方法提供一個全局訪問點,同時你也能夠定義公有屬性來提供全局訪問點
        /// </summary>
        /// <returns></returns>
        public static RabbitMQClientHandler GetInstance()
        {
            // 當第一個線程運行到這裏時,此時會對locker對象 "加鎖",
            // 當第二個線程運行該方法時,首先檢測到locker對象爲"加鎖"狀態,該線程就會掛起等待第一個線程解鎖
            // lock語句運行完以後(即線程運行完以後)會對該對象"解鎖"
            // 雙重鎖定只須要一句判斷就能夠了
            if (uniqueInstance == null)
            {
                lock (locker)
                {
                    // 若是類的實例不存在則建立,不然直接返回
                    if (uniqueInstance == null)
                    {
                        uniqueInstance = new RabbitMQClientHandler();
                    }
                }
            }
            return uniqueInstance;
        }

        static void Connect()
        {
            try
            {
                Common.LogHandler.WriteLog("獲取RabbitMQ服務器參數:" + defaultRabbitMQHostName + ":" + defaultRabbitMQPort + " (" + defaultRabbitMQUserName + "/" + defaultRabbitMQPassword + ")");
                //鏈接工廠
                mc_ConnectionFactory = new ConnectionFactory();

                //鏈接工廠信息
                mc_ConnectionFactory.HostName = defaultRabbitMQHostName;// "localhost";

                int rabbitmq_port = 5672;// 默認是5672端口
                int.TryParse(defaultRabbitMQPort, out rabbitmq_port);
                mc_ConnectionFactory.Port = rabbitmq_port;// "5672"

                mc_ConnectionFactory.UserName = defaultRabbitMQUserName;// "guest";
                mc_ConnectionFactory.Password = defaultRabbitMQPassword;// "guest";
                mc_ConnectionFactory.VirtualHost = defaultRabbitVirtualHost;// "/"

                mc_ConnectionFactory.RequestedHeartbeat = 30;//心跳包
                mc_ConnectionFactory.AutomaticRecoveryEnabled = true;//自動重連
                mc_ConnectionFactory.TopologyRecoveryEnabled = true;//拓撲重連
                mc_ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

                //建立鏈接
                Connection = mc_ConnectionFactory.CreateConnection();

                //斷開鏈接時,調用方法自動重連
                Connection.ConnectionShutdown += Connection_ConnectionShutdown;

                //建立發送頻道
                SendChannel = Connection.CreateModel();
                //建立接收頻道
                ListenChannel = Connection.CreateModel();

                //發送頻道確認模式,發送了消息後,能夠收到迴應
                SendChannel.ConfirmSelect();

                if(!string.IsNullOrEmpty(MC_SyncDataConsume))
                {
                    //從新監控消息
 RabbitmqMessageConsume(MC_SyncDataConsume); }

                Common.LogHandler.WriteLog("嘗試鏈接至RabbitMQ服務器:" + defaultRabbitMQHostName);
            }
            catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e)
            {
                throw e;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        static void Cleanup()
        {
            try
            {

                if (SendChannel != null && SendChannel.IsOpen)
                {
                    try
                    {
                        SendChannel.Close();
                    }
                    catch (Exception ex)
                    {
                        LogHandler.WriteLog("RabbitMQ從新鏈接,正在嘗試關閉以前的Channel[發送],但遇到錯誤", ex);
                    }
                    SendChannel = null;
                }

                if (ListenChannel != null && ListenChannel.IsOpen)
                {
                    try
                    {
                        ListenChannel.Close();
                    }
                    catch (Exception ex)
                    {
                        LogHandler.WriteLog("RabbitMQ從新鏈接,正在嘗試關閉以前的Channel[接收],但遇到錯誤", ex);
                    }
                    ListenChannel = null;
                }

                if (Connection != null && Connection.IsOpen)
                {
                    try
                    {
                        Connection.Close();
                    }
                    catch (Exception ex)
                    {
                        LogHandler.WriteLog("RabbitMQ從新鏈接,正在嘗試關閉以前的鏈接,但遇到錯誤", ex);
                    }
                    Connection = null;
                }
            }
            catch (IOException ex)
            {
                throw ex;
            }
        }

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            LogHandler.WriteLog("):  RabbitMQ已經斷開鏈接,正在嘗試從新鏈接至RabbitMQ服務器");

            Reconnect();
        }

        private static void Reconnect()
        {
            try
            {
                //清除鏈接及頻道
                Cleanup();

                var mres = new ManualResetEventSlim(false); // state is initially false
                while (!mres.Wait(3000)) // loop until state is true, checking every 3s
                {
                    try
                    {
                        //鏈接
                        Connect();
                        
                        mres.Set(); // state set to true - breaks out of loop
                    }
                    catch (Exception ex)
                    {
                        LogHandler.WriteLog("RabbitMQ嘗試鏈接RabbitMQ服務器出現錯誤:" + ex.Message, ex);
                    }
                }
            }
            catch (Exception ex)
            {
                LogHandler.WriteLog("RabbitMQ嘗試從新鏈接RabbitMQ服務器出現錯誤:" + ex.Message, ex);
            }
        }

        /*-------------------------------------------------------------------------------------*/


        /// <summary>
        /// Direct路由,發送消息至服務端
        /// </summary>
        /// <param name="exchangeName">交換機名稱</param>
        /// <param name="routingKey">RoutingKey</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="message">消息內容</param>
        /// <returns></returns>
        public bool DirectExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message)
        {
            try
            {
                if (Connection == null || !Connection.IsOpen) throw new Exception("鏈接爲空或鏈接已經關閉");
                if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道爲空或通道已經關閉");

                //建立一個持久化的隊列
                bool queueDurable = true;

                string QueueName = queueName;
                string ExchangeName = exchangeName;
                string RoutingKey = routingKey;

                //聲明交換機
                SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Direct);
                //聲明隊列
                SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null);
                //路由綁定隊列
                SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null);

                //設置消息持久性
                IBasicProperties props = SendChannel.CreateBasicProperties();
                props.ContentType = "text/plain";
                props.DeliveryMode = 2;//持久性

                //消息內容轉碼,併發送至服務器
                var messageBody = System.Text.Encoding.UTF8.GetBytes(message);
                SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody);

                //等待確認
                return SendChannel.WaitForConfirms();

            }
            catch (Exception ex)
            {
                LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex);

                return false;
            }
        }

        /// <summary>
        /// Fanout路由,發送消息至服務端
        /// </summary>
        /// <param name="exchangeName">交換機名稱</param>
        /// <param name="routingKey">RoutingKey</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="message">消息內容</param>
        /// <returns></returns>
        public bool FanoutExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message)
        {
            try
            {
                if (Connection == null || !Connection.IsOpen) throw new Exception("鏈接爲空或鏈接已經關閉");
                if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道爲空或通道已經關閉");

                //建立一個持久化的頻道
                bool queueDurable = true;

                string QueueName = queueName;
                string ExchangeName = exchangeName;
                string RoutingKey = routingKey;

                //聲明交換機
                SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Fanout);
                //聲明隊列
                SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null);
                //路由綁定隊列
                SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null);

                //設置消息持久性
                IBasicProperties props = SendChannel.CreateBasicProperties();
                props.ContentType = "text/plain";
                props.DeliveryMode = 2;//持久性

                //消息內容轉碼,併發送至服務器
                var messageBody = System.Text.Encoding.UTF8.GetBytes(message);
                SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody);

                //等待確認
                return SendChannel.WaitForConfirms();

            }
            catch (Exception ex)
            {
                LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex);

                return false;
            }
        }

        /// <summary>
        /// Topic路由,發送消息至服務端
        /// </summary>
        /// <param name="exchangeName">交換機名稱</param>
        /// <param name="routingKey">RoutingKey</param>
        /// <param name="queueName">隊列名稱</param>
        /// <param name="message">消息內容</param>
        /// <returns></returns>
        public bool TopicExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message)
        {
            try
            {
                if (Connection == null || !Connection.IsOpen) throw new Exception("鏈接爲空或鏈接已經關閉");
                if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道爲空或通道已經關閉");

                //建立一個持久化的頻道
                bool queueDurable = true;

                string QueueName = queueName;
                string ExchangeName = exchangeName;
                string RoutingKey = routingKey;

                //聲明交換機
                SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic);
                //聲明隊列
                SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null);
                //路由綁定隊列
                SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null);

                //設置消息持久性
                IBasicProperties props = SendChannel.CreateBasicProperties();
                props.ContentType = "text/plain";
                props.DeliveryMode = 2;//持久性

                //消息內容轉碼,併發送至服務器
                var messageBody = System.Text.Encoding.UTF8.GetBytes(message);
                SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody);

                //等待確認
                return SendChannel.WaitForConfirms();

            }
            catch (Exception ex)
            {
                LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex);

                return false;
            }
        }


        /// <summary>
        /// Topic路由,接收同步消息
        /// </summary>
        /// <param name="queueName">監聽的隊列</param>
        public void SyncDataFromServer(string queueName)
        {
            try
            {
                //設定參數,方便重啓RabbitMQ服務器時處理
                MC_SyncDataConsume = queueName;
                RabbitmqMessageConsume(MC_SyncDataConsume);
            }
            catch (Exception ex)
            {
                LogHandler.WriteLog("TopicExchangeConsumeMessageFromServer運行錯誤:" + ex.Message, ex);
                throw ex;
            }           
        }

        private static void RabbitmqMessageConsume(string queueName)
        {
            try
            {
                if (Connection == null || !Connection.IsOpen) throw new Exception("鏈接爲空或鏈接已經關閉");
                if (ListenChannel == null || !ListenChannel.IsOpen) throw new Exception("通道爲空或通道已經關閉");                


                bool queueDurable = true;
                string QueueName = queueName;

                //在MQ上定義一個持久化隊列,若是名稱相同不會重複建立
                ListenChannel.QueueDeclare(QueueName, queueDurable, false, false, null);
                //輸入1,那若是接收一個消息,可是沒有應答,則客戶端不會收到下一個消息
                ListenChannel.BasicQos(0, 1, false);

                //建立基於該隊列的消費者,綁定事件
                var consumer = new EventingBasicConsumer(ListenChannel);

                //迴應消息監控
                consumer.Received += SyncData_Received;

                //綁定消費者
                ListenChannel.BasicConsume(QueueName, //隊列名
                                      false,    //false:手動應答;true:自動應答
                                      consumer);

                Common.LogHandler.WriteLog("開始監控RabbitMQ服務器,隊列" + QueueName);

            }
            catch (Exception ex)
            {                
                throw ex;
            }
        }


        private static void SyncData_Received(object sender, BasicDeliverEventArgs e)
        {
            try
            {
                //TOOD 驗證程序退出後消費者是否退出去了
                var body = e.Body; //消息主體
                var message = Encoding.UTF8.GetString(body);

                LogHandler.WriteLog("[x] 隊列接收到消息:" + message.ToString());

                //處理數據
                bool processSuccessFlag = new SyncDataHandler().ProcessSyncData(message);
                if (processSuccessFlag)
                {
                    //回覆確認
                    ListenChannel.BasicAck(e.DeliveryTag, false);
                }
                else
                {
                    //未正常處理的消息,從新放回隊列
                    ListenChannel.BasicReject(e.DeliveryTag, true);
                }
            }
            catch (RabbitMQ.Client.Exceptions.OperationInterruptedException ex1)
            {
                Thread.Sleep(5000);
                ListenChannel.BasicNack(e.DeliveryTag, false, true);
            }
            catch (Exception ex)
            {
                Thread.Sleep(5000);
                ListenChannel.BasicNack(e.DeliveryTag, false, true);
            }
        }
    }
}
相關文章
相關標籤/搜索