重要參考文章來源:http://gigi.nullneuron.net/gigilabs/resilient-connections-with-rabbitmq-net-client/服務器
緣由是這樣的,我在Windows客戶端有一個Windows後臺服務,負責與服務端的數據交互,數據上傳及數據下載架構
1.數據上傳部分是使用的rabbitmq donnet庫發送消息至RabbittMQ服務器,服務器另外有一個應用程序會監控RabbitMQ服務器的指定隊列,完成數據的上傳服務併發
2.數據下載部分是使用的rabbitmq donnet庫監控RabbitMQ服務器指定的隊列,服務器應用程序將數據發送到指定的RabbitMQ服務器的隊列中,客戶端就會能獲取到服務器應用發送過來的數據函數
這樣的架構仍是比較好用的,使用的效果目前還行,但遇到一個比較頭痛的問題,Windows後臺服務一直在Windows平板電腦上運行,除非手動安裝及更新應用時,纔會將Windows服務進行從新安裝或重啓,其餘的狀況是不會進行重啓的oop
瞭解到RabbitMQ是有自動重連的技術的,能夠參考地址:https://yq.aliyun.com/articles/369969spa
這個效果只做用於,服務器沒有掛掉,只是中間有一些網絡問題時才能夠進行重連.net
但有一種狀況是沒有處理到的,咱們已經在客戶端對RabbitMQ某個隊列進行監控,但服務器忽然掛掉,而後幾分鐘後從新啓動了,這時,客戶端能夠從新創建鏈接,但卻不會自動對隊列產生監控,沒法拿到消息線程
現時對代碼作出一些處理code
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using System.Threading; using RabbitMQ.Client.Events; using System.IO; namespace MES_MonitoringService.Common { public class RabbitMQClientHandler { private static string defaultRabbitMQHostName = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerHostName"); private static string defaultRabbitMQPort = Common.ConfigFileHandler.GetAppConfig("RabbitMQServerPort"); private static string defaultRabbitMQUserName = Common.ConfigFileHandler.GetAppConfig("RabbitMQUserName"); private static string defaultRabbitMQPassword = Common.ConfigFileHandler.GetAppConfig("RabbitMQPassword"); private static string defaultRabbitVirtualHost = Common.ConfigFileHandler.GetAppConfig("RabbitMQVirtualHost"); // 定義一個靜態變量來保存類的實例 private static RabbitMQClientHandler uniqueInstance; //定義一個標識確保線程同步 private static readonly object locker = new object(); /*-------------------------------------------------------------------------------------*/ //ConnectionFactory private static ConnectionFactory mc_ConnectionFactory = null; //Connection private static IConnection Connection; //發送頻道及接收頻道分開,避免互相影響,致使整個服務不可用 //Send Channel private static IModel SendChannel; //Listen Channel private static IModel ListenChannel; //數據監控隊列 private static string MC_SyncDataConsume = string.Empty; // //private SyncDataHandler syncDataHandlerClass; /*-------------------------------------------------------------------------------------*/ /// <summary> /// 定義私有構造函數,使外界不能建立該類實例 /// </summary> public RabbitMQClientHandler() { Reconnect(); } /// <summary> /// 定義公有方法提供一個全局訪問點,同時你也能夠定義公有屬性來提供全局訪問點 /// </summary> /// <returns></returns> public static RabbitMQClientHandler GetInstance() { // 當第一個線程運行到這裏時,此時會對locker對象 "加鎖", // 當第二個線程運行該方法時,首先檢測到locker對象爲"加鎖"狀態,該線程就會掛起等待第一個線程解鎖 // lock語句運行完以後(即線程運行完以後)會對該對象"解鎖" // 雙重鎖定只須要一句判斷就能夠了 if (uniqueInstance == null) { lock (locker) { // 若是類的實例不存在則建立,不然直接返回 if (uniqueInstance == null) { uniqueInstance = new RabbitMQClientHandler(); } } } return uniqueInstance; } static void Connect() { try { Common.LogHandler.WriteLog("獲取RabbitMQ服務器參數:" + defaultRabbitMQHostName + ":" + defaultRabbitMQPort + " (" + defaultRabbitMQUserName + "/" + defaultRabbitMQPassword + ")"); //鏈接工廠 mc_ConnectionFactory = new ConnectionFactory(); //鏈接工廠信息 mc_ConnectionFactory.HostName = defaultRabbitMQHostName;// "localhost"; int rabbitmq_port = 5672;// 默認是5672端口 int.TryParse(defaultRabbitMQPort, out rabbitmq_port); mc_ConnectionFactory.Port = rabbitmq_port;// "5672" mc_ConnectionFactory.UserName = defaultRabbitMQUserName;// "guest"; mc_ConnectionFactory.Password = defaultRabbitMQPassword;// "guest"; mc_ConnectionFactory.VirtualHost = defaultRabbitVirtualHost;// "/" mc_ConnectionFactory.RequestedHeartbeat = 30;//心跳包 mc_ConnectionFactory.AutomaticRecoveryEnabled = true;//自動重連 mc_ConnectionFactory.TopologyRecoveryEnabled = true;//拓撲重連 mc_ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10); //建立鏈接 Connection = mc_ConnectionFactory.CreateConnection(); //斷開鏈接時,調用方法自動重連 Connection.ConnectionShutdown += Connection_ConnectionShutdown; //建立發送頻道 SendChannel = Connection.CreateModel(); //建立接收頻道 ListenChannel = Connection.CreateModel(); //發送頻道確認模式,發送了消息後,能夠收到迴應 SendChannel.ConfirmSelect(); if(!string.IsNullOrEmpty(MC_SyncDataConsume)) { //從新監控消息 RabbitmqMessageConsume(MC_SyncDataConsume); } Common.LogHandler.WriteLog("嘗試鏈接至RabbitMQ服務器:" + defaultRabbitMQHostName); } catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException e) { throw e; } catch (Exception ex) { throw ex; } } static void Cleanup() { try { if (SendChannel != null && SendChannel.IsOpen) { try { SendChannel.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ從新鏈接,正在嘗試關閉以前的Channel[發送],但遇到錯誤", ex); } SendChannel = null; } if (ListenChannel != null && ListenChannel.IsOpen) { try { ListenChannel.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ從新鏈接,正在嘗試關閉以前的Channel[接收],但遇到錯誤", ex); } ListenChannel = null; } if (Connection != null && Connection.IsOpen) { try { Connection.Close(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ從新鏈接,正在嘗試關閉以前的鏈接,但遇到錯誤", ex); } Connection = null; } } catch (IOException ex) { throw ex; } } private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) { LogHandler.WriteLog("): RabbitMQ已經斷開鏈接,正在嘗試從新鏈接至RabbitMQ服務器"); Reconnect(); } private static void Reconnect() { try { //清除鏈接及頻道 Cleanup(); var mres = new ManualResetEventSlim(false); // state is initially false while (!mres.Wait(3000)) // loop until state is true, checking every 3s { try { //鏈接 Connect(); mres.Set(); // state set to true - breaks out of loop } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ嘗試鏈接RabbitMQ服務器出現錯誤:" + ex.Message, ex); } } } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ嘗試從新鏈接RabbitMQ服務器出現錯誤:" + ex.Message, ex); } } /*-------------------------------------------------------------------------------------*/ /// <summary> /// Direct路由,發送消息至服務端 /// </summary> /// <param name="exchangeName">交換機名稱</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">隊列名稱</param> /// <param name="message">消息內容</param> /// <returns></returns> public bool DirectExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("鏈接爲空或鏈接已經關閉"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道爲空或通道已經關閉"); //建立一個持久化的隊列 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //聲明交換機 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Direct); //聲明隊列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由綁定隊列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //設置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息內容轉碼,併發送至服務器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待確認 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex); return false; } } /// <summary> /// Fanout路由,發送消息至服務端 /// </summary> /// <param name="exchangeName">交換機名稱</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">隊列名稱</param> /// <param name="message">消息內容</param> /// <returns></returns> public bool FanoutExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("鏈接爲空或鏈接已經關閉"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道爲空或通道已經關閉"); //建立一個持久化的頻道 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //聲明交換機 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Fanout); //聲明隊列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由綁定隊列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //設置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息內容轉碼,併發送至服務器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待確認 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex); return false; } } /// <summary> /// Topic路由,發送消息至服務端 /// </summary> /// <param name="exchangeName">交換機名稱</param> /// <param name="routingKey">RoutingKey</param> /// <param name="queueName">隊列名稱</param> /// <param name="message">消息內容</param> /// <returns></returns> public bool TopicExchangePublishMessageToServerAndWaitConfirm(string exchangeName, string routingKey, string queueName, string message) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("鏈接爲空或鏈接已經關閉"); if (SendChannel == null || !SendChannel.IsOpen) throw new Exception("通道爲空或通道已經關閉"); //建立一個持久化的頻道 bool queueDurable = true; string QueueName = queueName; string ExchangeName = exchangeName; string RoutingKey = routingKey; //聲明交換機 SendChannel.ExchangeDeclare(ExchangeName, ExchangeType.Topic); //聲明隊列 SendChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //路由綁定隊列 SendChannel.QueueBind(QueueName, ExchangeName, RoutingKey, null); //設置消息持久性 IBasicProperties props = SendChannel.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;//持久性 //消息內容轉碼,併發送至服務器 var messageBody = System.Text.Encoding.UTF8.GetBytes(message); SendChannel.BasicPublish(ExchangeName, RoutingKey, props, messageBody); //等待確認 return SendChannel.WaitForConfirms(); } catch (Exception ex) { LogHandler.WriteLog("RabbitMQ出現通用問題" + ex.Message, ex); return false; } } /// <summary> /// Topic路由,接收同步消息 /// </summary> /// <param name="queueName">監聽的隊列</param> public void SyncDataFromServer(string queueName) { try { //設定參數,方便重啓RabbitMQ服務器時處理 MC_SyncDataConsume = queueName; RabbitmqMessageConsume(MC_SyncDataConsume); } catch (Exception ex) { LogHandler.WriteLog("TopicExchangeConsumeMessageFromServer運行錯誤:" + ex.Message, ex); throw ex; } } private static void RabbitmqMessageConsume(string queueName) { try { if (Connection == null || !Connection.IsOpen) throw new Exception("鏈接爲空或鏈接已經關閉"); if (ListenChannel == null || !ListenChannel.IsOpen) throw new Exception("通道爲空或通道已經關閉"); bool queueDurable = true; string QueueName = queueName; //在MQ上定義一個持久化隊列,若是名稱相同不會重複建立 ListenChannel.QueueDeclare(QueueName, queueDurable, false, false, null); //輸入1,那若是接收一個消息,可是沒有應答,則客戶端不會收到下一個消息 ListenChannel.BasicQos(0, 1, false); //建立基於該隊列的消費者,綁定事件 var consumer = new EventingBasicConsumer(ListenChannel); //迴應消息監控 consumer.Received += SyncData_Received; //綁定消費者 ListenChannel.BasicConsume(QueueName, //隊列名 false, //false:手動應答;true:自動應答 consumer); Common.LogHandler.WriteLog("開始監控RabbitMQ服務器,隊列" + QueueName); } catch (Exception ex) { throw ex; } } private static void SyncData_Received(object sender, BasicDeliverEventArgs e) { try { //TOOD 驗證程序退出後消費者是否退出去了 var body = e.Body; //消息主體 var message = Encoding.UTF8.GetString(body); LogHandler.WriteLog("[x] 隊列接收到消息:" + message.ToString()); //處理數據 bool processSuccessFlag = new SyncDataHandler().ProcessSyncData(message); if (processSuccessFlag) { //回覆確認 ListenChannel.BasicAck(e.DeliveryTag, false); } else { //未正常處理的消息,從新放回隊列 ListenChannel.BasicReject(e.DeliveryTag, true); } } catch (RabbitMQ.Client.Exceptions.OperationInterruptedException ex1) { Thread.Sleep(5000); ListenChannel.BasicNack(e.DeliveryTag, false, true); } catch (Exception ex) { Thread.Sleep(5000); ListenChannel.BasicNack(e.DeliveryTag, false, true); } } } }