在前面講到了RabbitMQ高可用集羣的搭建,可是咱們知道只是集羣的高可用並不能保證應用在使用消息隊列時徹底沒有問題,例如若是應用鏈接的RabbitMQ集羣忽然宕機了,雖然這個集羣時可使用的,可是應用訂閱的鏈接就斷開了,若是有個機房外網出口帶寬被挖掘機弄斷了,那集羣依然是不可用的。因此咱們後面會介紹應用APP如何與鏈接集羣來保證二者配合默契,以及如何實現跨機房的集羣複製。服務器
前面講到應用服務器經過一個負載均衡服務將鏈接的流量分發到指定服務器,若是鏈接的節點宕機怎麼辦呢。應用服務器鏈接集羣主要作兩件事,訂閱和發佈,因此若是是發佈消息每次都會從新初始化鏈接因此鏈接節點的切換對整個系統的可用性影響不大。若是是訂閱消息就沒有真麼簡單了。首先咱們要作到若是鏈接出現問題應該是拋出異常而不是終止腳本,而且這時應該從新鏈接鏈接。
好了不廢話了,代碼以下:負載均衡
ColonyProductide
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace ColonyProduct { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String queueName = "wytQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.130"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; for (int i = 0; i < 10; i++) { Byte[] body = Encoding.UTF8.GetBytes("Hello World -- "+i); channel.BasicPublish(exchange: exchangeName, routingKey: String.Empty, basicProperties: properties, body: body); } } } Console.WriteLine("發送完成"); Console.ReadKey(); } } }
ColonyConsumerspa
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace ColonyConsumer { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String queueName = "wytQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.133"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine("等待接收消息"); Console.ReadKey(); } } } } }
因此經過以上的代碼就能夠保證服務器某節點宕機後訂閱的鏈接自動重連切換。插件
基於warren的共享存儲模式code
這種方式其實並非跨地區的遠程複製,而且須要共享存儲,若是感興趣的同窗能夠百度下。blog
基於Shovel的遠程複製rabbitmq
若是直接基於WAN來組建異地的集羣的話,集羣間大量的數據通信會產生高昂的費用,另外Erlang也不容許這麼高延遲的通信。
Shovel是RabbitMQ自帶插件(2.7.0後),自帶插件的好處就是能夠在RabbitMQ服務啓動時自動啓動Shovel和自定義複製關係。
Shovel運行的原理其實很是簡單。經過定義RabbitMQ上一個隊列和另一個RabbitMQ上的交換機之間的複製關係來實現遠程複製。也就是說它會在主服務上創建一個隊列來監聽交換機,因此這是到交換機因此的消息會投遞到該隊列,而且在從服務中訂閱這個隊列,使隊列中的消息複製到從服務的交換機中。RabbitMQ是一個比較全面的消息隊列解決方案,咱們公司並無用到該功能,只是在這提下,感興趣的同窗能夠搜下。 隊列