驅動組件.NET版本
官網推薦驅動:RabbitMQ.Clienthtml
https://www.rabbitmq.com/devtools.html#dotnet-devjava
Connection和Channel
Connection是一個TCP鏈接,通常服務器這種資源都是很寶貴的,因此提供了Channel,完成消息的發佈消費。這樣Connection就能夠作成單例模式的。服務器
一、事件負載均衡
Connection和Channel裏面包含了幾個事件。分別在不一樣的狀況下觸發異步
其餘時間執行發生異常,就會執行這個ide
Connection.CallbackException性能
恢復鏈接成功fetch
Connection.RecoverySucceededui
鏈接恢復異常時會觸發這個事件spa
Connection.ConnectionRecoveryError
RabbitMQ出於自身保護策略,經過阻塞方式限制寫入,致使了生產者應用「假死」,不對外服務。比若說CPU IO RAM降低,隊列堆積,致使堵塞。 就會觸發這個事件
Connection.ConnectionBlocked
阻塞解除會觸發這個事件
Connection.ConnectionUnblocked
connection斷開鏈接時候
Connection.ConnectionShutdown
-------------------------------------------------------------------------------------------------------------------------------------------------------
.NET RabbitMQ.Client中Channel叫作Model
channel斷開鏈接時候觸發
Channel.ModelShutdown
其餘時間執行發生異常,就會執行這個
Channel.CallbackException
broker 發現當前消息沒法被路由到指定的 queues 中(若是設置了 mandatory 屬性,則 broker 會先發送 basic.return)
Channel.BasicReturn
Channel.BasicRecoverOk
Signalled when a Basic.Nack command arrives from the broker.
Channel.BasicNacks
Signalled when a Basic.Ack command arrives from the broker.
Channel.BasicAcks
Channel.FlowControl
二、屬性
最大channel數量
connetion.ChannelMax
服務上這個鏈接的對象屬性
connetion.ClientProperties
服務器上這個鏈接的名字
connetion.ClientProvidedName
關閉緣由
connetion.CloseReason
端口
connetion.Endpoint
和客戶端通訊時所容許的最大的frame size
connetion.FrameMax
鏈接的心跳包
connetion.Heartbeat
是否打開
connetion.IsOpen
獲取vhost
connetion.KnownHosts
本地端口
connetion.LocalPort
鏈接串使用的協議
connetion.Protocol
遠程端口,服務器
connetion.RemotePort
服務器屬性
connetion.ServerProperties
關停信息
connetion.ShutdownReport
----------------------------------------------------------------------------------------------------------------------------------------
channel編號
channel.ChannelNumber
關閉緣由
channel.CloseReason
鏈接超時時間
channel.ContinuationTimeout
channel.IsClosed
channel.IsOpen
下一個消息編號
channel.NextPublishSeqNo
三、方法
終止鏈接以及他們的channel,能夠指定時間長度。
connetion.Abort()
關閉鏈接以及他的channel
connetion.Close()
建立channel
connetion.CreateModel()
connetion.HandleConnectionBlocked()
connetion.HandleConnectionUnblocked()
發送消息Confirm模式
目的確認消息是否到達消息隊列中
一、mandatory
broker 發現當前消息沒法被路由到指定的 queues 中(若是設置了 mandatory 屬性,則 broker 會先發送 basic.return)
channel.BasicReturn += Channel_BasicReturn; channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e) { Console.WriteLine("Channel_BasicReturn"); }
二、普通Confirm模式
channel.ConfirmSelect(); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey1",mandatory:true, basicProperties: null, body: body); if (channel.WaitForConfirms()) { Console.WriteLine("普通發送方確認模式"); }
三、批量Confirm模式
channel.ConfirmSelect(); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey",mandatory:true, basicProperties: null, body: body); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); channel.WaitForConfirmsOrDie(); Console.WriteLine("普通發送方確認模式");
四、異步Confirm模式
java版本組件有
五、事物
try { //聲明事物 channel.TxSelect(); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); //提交事物 channel.TxCommit(); } catch (Exception) { //回滾 channel.TxRollback(); }
上面說的是生產者發佈消息確認,那麼消費者消費如何確認呢,你們都知道消費者有ack機制,可是用到事物的時候,是怎樣的呢
1.autoAck=false手動應對的時候是支持事務的,也就是說即便你已經手動確認了消息已經收到了,但在確認消息會等事務的返回解決以後,在作決定是確認消息仍是從新放回隊列,若是你手動確認如今以後,又回滾了事務,那麼已事務回滾爲主,此條消息會從新放回隊列;
2.autoAck=true若是自定確認爲true的狀況是不支持事務的,也就是說你即便在收到消息以後在回滾事務也是於事無補的,隊列已經把消息移除了;
事物比較耗性能
簡單消息發送
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.140.161"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "TestVHost"; //建立connetion using (var connetion = factory.CreateConnection()) { connetion.CallbackException += Connetion_CallbackException; connetion.RecoverySucceeded += Connetion_RecoverySucceeded; connetion.ConnectionRecoveryError += Connetion_ConnectionRecoveryError; connetion.ConnectionBlocked += Connetion_ConnectionBlocked; connetion.ConnectionUnblocked += Connetion_ConnectionUnblocked; //鏈接關閉的時候 connetion.ConnectionShutdown += Connetion_ConnectionShutdown; //建立channel using (var channel = connetion.CreateModel()) { //消息會在什麼時候被 confirm? //The broker will confirm messages once: //broker 將在下面的狀況中對消息進行 confirm : //it decides a message will not be routed to queues //(if the mandatory flag is set then the basic.return is sent first) or //broker 發現當前消息沒法被路由到指定的 queues 中(若是設置了 mandatory 屬性,則 broker 會先發送 basic.return) //a transient message has reached all its queues(and mirrors) or //非持久屬性的消息到達了其所應該到達的全部 queue 中(和鏡像 queue 中) //a persistent message has reached all its queues(and mirrors) and been persisted to disk(and fsynced) or //持久消息到達了其所應該到達的全部 queue 中(和鏡像 queue 中),並被持久化到了磁盤(被 fsync) //a persistent message has been consumed(and if necessary acknowledged) from all its queues //持久消息從其所在的全部 queue 中被 consume 了(若是必要則會被 acknowledge) //broker 發現當前消息沒法被路由到指定的 queues 中(若是設置了 mandatory 屬性,則 broker 會先發送 basic.return) channel.BasicReturn += Channel_BasicReturn; //(能夠不聲明)若是不聲明交換機 ,那麼就使用默認的交換機 (每個vhost都會有一個默認交換機) //channel.ExchangeDeclare("amq.direct", ExchangeType.Direct,true); //建立一個隊列 bool durable(持久化), bool exclusive(專有的), bool autoDelete(自動刪除) //channel.QueueDeclare("TestQueue", true, false, false, null); //不作綁定的話,使用默認的交換機。 //channel.QueueBind("TestQueue", "amq.direct", "MyRoutKey", null); //發佈消息 var body = Encoding.UTF8.GetBytes("西伯利亞的狼"); channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body); } Console.WriteLine("Hello World!"); Console.ReadKey(); } } private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e) { Console.WriteLine("Channel_BasicReturn"); } private static void Connetion_ConnectionShutdown(object sender, ShutdownEventArgs e) { Console.WriteLine("Connetion_ConnectionShutdown"); } private static void Connetion_ConnectionUnblocked(object sender, EventArgs e) { Console.WriteLine("Connetion_ConnectionUnblocked"); } private static void Connetion_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e) { Console.WriteLine("Connetion_ConnectionBlocked"); } private static void Connetion_ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e) { Console.WriteLine("Connetion_ConnectionRecoveryError"); } private static void Connetion_RecoverySucceeded(object sender, EventArgs e) { Console.WriteLine("Connetion_RecoverySucceeded"); } private static void Connetion_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e) { Console.WriteLine("Connetion_CallbackException"); }
場景分析
消息持久化
Broker持久化、交換機持久化、隊列持久化 。目的是維持重啓後 這些東西的存在。
消息持久化,纔是把消息持久化到硬盤中,由於消息在隊列中,因此須要隊列持久化。
設置消息持久化,須要設置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)). 默認的就是持久化。
//發佈消息 var body = Encoding.UTF8.GetBytes("西伯利亞的狼"); BasicProperties pro = new BasicProperties(); pro.DeliveryMode = 2; channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: pro, body: body);
消費者消費消息
爲了確保一個消息永遠不會丟失,RabbitMQ支持消息確認(message acknowledgments)。當消費端接收消息而且處理完成後,會發送一個ack(消息確認)信號到RabbitMQ,RabbitMQ接收到這個信號後,就能夠刪除掉這條已經處理的消息任務。但若是消費端掛掉了(好比,通道關閉、鏈接丟失等)沒有發送ack信號。RabbitMQ就會明白某個消息沒有正常處理,RabbitMQ將會從新將消息入隊,若是有另一個消費端在線,就會快速的從新發送到另一個消費端。
RabbitMQ中沒有消息超時的概念,只有當消費端關閉或奔潰時,RabbitMQ纔會從新分發消息。
ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.140.161"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; factory.VirtualHost = "TestVHost"; //建立connetion using (var connetion = factory.CreateConnection()) { using (var channel = connetion.CreateModel()) { //構造消費者實例 var consumer = new EventingBasicConsumer(channel); //綁定消息接收後的事件委託 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(6000);//模擬耗時 Console.WriteLine(" [x] Done"); // 主要改動的是將 autoAck:true修改成autoAck:fasle,以及在消息處理完畢後手動調用BasicAck方法進行手動消息確認。 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //啓動消費者 channel.BasicConsume(queue: "TestQueue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } using (var connetion = factory.CreateConnection()) { using (var channel = connetion.CreateModel()) { //構造消費者實例 var consumer = new EventingBasicConsumer(channel); //綁定消息接收後的事件委託 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(6000);//模擬耗時 Console.WriteLine(" [x] Done"); }; //啓動消費者 channel.BasicConsume(queue: "TestQueue", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
消費負載均衡
一、當一個隊列有多個消費者時,隊列會以循環(round-robin)的方式發送給消費者。每條消息只會給一個訂閱的消費者。
二、默認狀況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均每一個消費者將得到相同數量的消息。這種分發消息的方式叫作循環(round-robin)。
三、RabbitMQ的消息分發默認按照消費端的數量,按順序循環分發。這樣僅是確保了消費端被平均分發消息的數量,但卻忽略了消費端的閒忙狀況。這就可能出現某個消費端一直處理耗時任務處於阻塞狀態,某個消費端一直處理通常任務處於空置狀態,而只是它們分配的任務數量同樣。
但咱們能夠經過channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息,也就確保了當消費端處於忙碌狀態時,再也不分配任務。
分庫分表模式
好比說客戶積分同步。
通常電商中這種數據量比較大,及時性比較高。
ID 編號 1-10000的用戶積分表更放在隊列1,10001-20000放在隊列2,不一樣的消費者消費不一樣隊列。以此類推...
RPC
第一步,主要是進行遠程調用的客戶端須要指定接收遠程回調的隊列,並申明消費者監聽此隊列。
第二步,遠程調用的服務端除了要申明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽的結果的隊列中去。
遠程調用客戶端:
//申明惟一guid用來標識這次發送的遠程調用請求 var correlationId = Guid.NewGuid().ToString(); //申明須要監聽的回調隊列 var replyQueue = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties(); properties.ReplyTo = replyQueue;//指定回調隊列 properties.CorrelationId = correlationId;//指定消息惟一標識 string number = args.Length > 0 ? args[0] : "30"; var body = Encoding.UTF8.GetBytes(number); //發佈消息 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body); Console.WriteLine($"[*] Request fib({number})"); // //建立消費者用於處理消息回調(遠程調用返回結果) var callbackConsumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer); callbackConsumer.Received += (model, ea) => { //僅當消息回調的ID與發送的ID一致時,說明遠程調用結果正確返回。 if (ea.BasicProperties.CorrelationId == correlationId) { var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}"; Console.WriteLine($"[x]: {responseMsg}"); } };
遠程調用服務端:
//申明隊列接收遠程調用請求 channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine("[*] Waiting for message."); //請求處理邏輯 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int n = int.Parse(message); Console.WriteLine($"Receive request of Fib({n})"); int result = Fib(n); //從請求的參數中獲取請求的惟一標識,在消息回傳時一樣綁定 var properties = ea.BasicProperties; var replyProerties = channel.CreateBasicProperties(); replyProerties.CorrelationId = properties.CorrelationId; //將遠程調用結果發送到客戶端監聽的隊列上 channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo, basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString())); //手動發回消息確認 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Return result: Fib({n})= {result}"); }; channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);