在一些場合,如轉帳、付費時每一條消息都必須保證成功的被處理。AMQP是金融級的消息隊列協議,有很高的可靠性,這裏介紹在使用RabbitMQ時怎麼保證消息被成功處理的。消息確承認以分爲兩種:一種是生產者發送消息到Broke時,Broker給生產者發送確認回執,用於告訴生產者消息已被成功發送到Broker;一種是消費者接收到Broker發送的消息時,消費者給Broker發送確認回執,用於通知消息已成功被消費者接收。html
下邊分別介紹生產者端和消費者端的消息確認方法。準備條件:使用Web管理工具添加exchange、queue並綁定,bindingKey爲「mykey」,以下所示:web
生產者端的消息確認:當生產者將消息發送給Broker,Broker接收到消息給生產者發送確認回執。生產者端的消息確認有兩種方式:tx機制和Confirm模式。服務器
tx機制能夠叫作事務機制,RabbitMQ中有三個與tx機制的方法:txSelect(), txCommit()和txRollback()。 channel.txSelect() 用於將當前channel設置成transaction模式, channel.txCommit() 提交事務, channel.txRollback() 回滾事務。使用tx機制,咱們首先要經過txSelect方法開啓事務,而後發佈消息給broker服務器了,若是txCommit提交成功了,則說明消息成功被broker接收了;若是在txCommit執行以前broker異常崩潰或者因爲其餘緣由拋出異常,這個時候咱們能夠捕獲異常,經過txRollback回滾事務。看一個tx機制的簡單實現:app
var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //發送消息 //在控制檯輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); try { //開啓事務機制 channel.TxSelect(); //發送消息 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); //事務提交 channel.TxCommit(); Console.WriteLine($"【{message}】發送到Broke成功!"); } catch (Exception) { Console.WriteLine($"【{message}】發送到Broker失敗!"); channel.TxRollback(); } } } } Console.ReadKey(); }
程序運行結果以下:工具
C#的RabbitMQ API中,有三個與Confirm相關的方法:ConfirmSelect(),WaitForConfirms()和WaitForConfirmOrDie。 channel.ConfirmSelect() 表示開啓Confirm模式; channel.WaitForConfirms() 等待全部消息確認,若是全部的消息都被服務端成功接收返回true,只要有一條沒有被成功接收就返回false。 channel.WaitForConfirmsOrDie() 和WaitForConfirms做用類型,也是等待全部消息確認,區別在於該方法沒有返回值(Void),若是有任意一條消息沒有被成功接收,該方法會當即拋出一個OperationInterrupedException類型異常。看一個Confirm模式的簡單實現:ui
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //在控制檯輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //開啓Confirm模式 channel.ConfirmSelect(); //發送消息 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); //WaitForConfirms確認消息(能夠同時確認多條消息)是否發送成功 if (channel.WaitForConfirms()) { Console.WriteLine($"【{message}】發送到Broke成功!"); } } } } Console.ReadKey(); }
程序運行結果:spa
從Broke發送到消費者時,RabbitMQ提供了兩種消息確認的方式:自動確認和顯示確認。3d
自動確認:當RabbbitMQ將消息發送給消費者後,消費者端接收到消息後,不等待消息處理結束,當即自動回送一個確認回執。自動確認的用法十分簡單,設置消費方法的參數autoAck爲true便可,咱們前邊的例子都是使用的自動確認,這裏再也不詳細演示,以下:code
channel.BasicConsume(queue: "myqueue",autoAck: true, consumer: consumer);
注意:Broker會在接收到確認回執時刪除消息,若是消費者接收到消息並返回了確認回執,而後這個消費者在處理消息時掛了,那麼這條消息就再也找不回來了。server
咱們知道自動確承認能會出現消息丟失的問題,咱們難免會想到:Broker收到回執後才刪除消息,若是可讓消費者在接收消息時不當即返回確認回執,等到消息處理完成後(或者完成一部分的邏輯)再返回確認回執,這樣就保證消費端不會丟失消息了!這正是顯式確認的思路。使用顯示確認也比較簡單,首先將Resume方法的參數autoAck設置爲false,而後在消費端使用代碼 channel.BasicAck()/BasicReject()等方法 來確認和拒絕消息。看一個栗子:
生產者代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //發送消息 //在控制檯輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //基本發佈 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); Console.WriteLine($"消息【{message}】已發送到隊列"); } } } Console.ReadKey(); }
消費者代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"接受到消息【{message}】"); //以news開頭表示是新聞類型,處理完成後確認消息 if (message.StartsWith("news")) { //這裏處理消息balabala Console.WriteLine($"【{message}】是新聞消息,處理消息並確認"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } //不以news開頭表示不是新聞類型,不進行處理,把消息退回到queue中 else { Console.WriteLine($"【{message}】不是新聞類型,拒絕處理"); channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); } }; Console.WriteLine("消費者準備就緒...."); //第五步:處理消息 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); } } }
介紹一下代碼中標紅的兩個方法: channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 方法用於確認消息,deliveryTag參數是分發的標記,multiple表示是否確認多條。 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 方法用於拒絕消息,deliveryTag也是指分發的標記,requeue表示消息被拒絕後是否從新放回queue中,true表示放回queue中,false表示直接丟棄。
運行這兩個應用程序,經過生產者發送兩條消息,效果以下:
一些意外的狀況:使用顯式確認時,若是消費者處理完消息不發送確認回執,那麼消息不會被刪除,消息的狀態一直是Unacked,這條消息也不會再發送給其餘消費者。若是一個消費者在處理消息時還沒有發送確認回執的狀況下掛掉了,那麼消息會被從新放入隊列(狀態從Unacked變成Ready),有其餘消費者存時,消息會發送給其餘消費者。
在前邊已經介紹了exchange和queue的持久化,把exchange和queue的durable屬性設置爲true,重啓rabbitmq服務時( 重啓命令:rabbitmqctl stop_app ;rabbitmqctl start_app ),exchange和queue也會恢復。咱們須要注意的是:若是queue設置durable=true,rabbitmq服務重啓後隊列雖然會存在,可是隊列內的消息會丟所有丟失。那麼怎麼實現消息的持久化呢?實現的方法很簡單:將exchange和queue都設置durable=true,而後在消息發佈的時候設置persistent=true便可。看一個栗子:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //在控制檯輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //設置消息持久化 var props = channel.CreateBasicProperties(); props.Persistent = true; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: body); Console.WriteLine($"【{message}】發送到Broke成功!"); } } } Console.ReadKey(); }
聲明exchange和queue時設置durable=true,而後執行上邊的代碼,傳入一條消息。重啓rabbitmq後,exchange,queue和消息都會恢復。咱們也能夠在web管理界面設置消息持久化,以下:
咱們知道queue是先進先出的,即先發送的消息,先被消費。可是在具體業務中可能會遇到要提早處理某些消息的需求,如一個常見的需求:普通客戶的消息按先進先出的順序處理,Vip客戶的消息要提早處理。消息實現優先級控制的實現方式是:首先在聲明queue是設置隊列的x-max-priority屬性,而後在publish消息時,設置消息的優先級等級便可。爲了演示方便,約定全部vip客戶的信息都以vip開頭,看一下代碼實現:
生產者代碼:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //聲明隊列queue channel.QueueDeclare(queue: "myqueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { //隊列優先級最高爲10,不加x-max-priority的話,計算髮布時設置了消息的優先級也不會生效 {"x-max-priority",10 } }); //綁定exchange和queue channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey"); Console.WriteLine("生產者準備就緒...."); //一些待發送的消息 string[] msgs = { "vip1", "hello2", "world3","common4", "vip5" }; //設置消息優先級 var props = channel.CreateBasicProperties(); foreach (string msg in msgs) { //vip開頭的消息,優先級設置爲9 if (msg.StartsWith("vip")) { props.Priority = 9; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: Encoding.UTF8.GetBytes(msg)); } //其餘消息的優先級爲1 else { props.Priority = 1; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: Encoding.UTF8.GetBytes(msg)); } } } } Console.ReadKey(); }
消費者,不須要對消費者作額外的配置,代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine(Encoding.UTF8.GetString(ea.Body)); }; Console.WriteLine("消費者準備就緒...."); //處理消息 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadKey(); #endregion } } }
運行程序,結果以下,咱們看到vip開頭的消息被率先處理了,證實優先級是生效的
本節簡單介紹了Rabbitmq中的消息確認,消息持久化,消息優先級的實現方式,這幾個功能在開發中會常常用到,RabbitMQ還有一些其餘有用的功能,如Lazy queue模式,dead letter處理,queue的消息條數、字節數限制等,這裏沒有具體演示,有興趣的園友能夠本身研究一下。
原文出處:https://www.cnblogs.com/wyy1234/p/10868416.html