本篇介紹一下RabbitMQ中的消費模式,在前邊的全部栗子中咱們採用的消費者都是EventingBasicConsumer,其實RabbitMQ中還有其餘兩種消費模式:BasicGet和QueueBaicConsumer,下邊介紹RabiitMQ的消費模式,及使用它們時須要注意的一些問題。html
使用Web管理工具添加exchange、queue並綁定,bindingKey爲「mykey」,以下所示:工具
生產者代碼以下:fetch
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //在控制檯輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); Console.WriteLine($"【{message}】發送到Broke成功!"); } } } Console.ReadKey(); }
EventingBasicConsumer是發佈/訂閱模式的消費者,即只要訂閱的queue中有了新消息,Broker就會當即把消息推送給消費者,這種模式能夠保證消息及時地被消費者接收到。EventingBasicConsumer是長鏈接的:只須要建立一個Connection,而後在Connection的基礎上建立通道channel,消息的發送都是經過channel來執行的,這樣能夠減小Connection的建立,比較節省資源。前邊咱們已經使用了不少次EventingBaiscConsumer,這裏簡單展現一下使用的方式,註釋比較詳細,就很少介紹了。ui
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定義一個EventingBasicConsumer消費者 var consumer = new EventingBasicConsumer(channel); //接收到消息時觸發的事件 consumer.Received += (model, ea) => { Console.WriteLine(Encoding.UTF8.GetString(ea.Body)); }; Console.WriteLine("消費者準備就緒...."); //調用消費方法 queue指定消費的隊列,autoAck指定是否自動確認,consumer就是消費者對象 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadKey(); #endregion } } }
執行程序,結果以下,只要咱們在生產者端發送一條消息到Broker,Broker就會當即推送消息到消費者。spa
咱們知道使用EventingBasicConsumer可讓消費者最及時地獲取到消息,使用EventingBasicConsumer模式時消費者在被動的接收消息,即消息是推送過來的,Broker是主動的一方。那麼能不能讓消費者做爲主動的一方,消費者何時想要消息了,就本身發送一個請求去找Broker要?答案使用Get方式。Get方式是短鏈接的,消費者每次想要消息的時候,首先創建一個Connection,發送一次請求,Broker接收到請求後,響應一條消息給消費者,而後斷開鏈接。RabbitMQ中Get方式和HTTP的請求響應流程基本同樣,Get方式的實時性比較差,也比較耗費資源。咱們看一個Get方式的栗子:code
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region BasicGet //經過BasicGet獲取消息,開啓自動確認 BasicGetResult result = channel.BasicGet(queue:"myqueue",autoAck:true); Console.WriteLine($"接收到消息【{Encoding.UTF8.GetString(result.Body)}】"); //打印exchange和routingKey Console.WriteLine($"exchange:{result.Exchange},routingKey:{result.RoutingKey}"); Console.ReadLine(); #endregion } } }
執行生成者和消費者程序,生產者發送三條消息,而消費者只獲取了一條消息,這是由於channel.BasicGet()一次只獲取一條消息,獲取到消息後就把鏈接斷開了。server
補充:RabbitMQ還有一種消費者QueueBaicConsumer,用法和Get方式相似,QueueBaicConsumer在官方API中標記已過期,這裏再也不介紹,有興趣的小夥伴能夠本身研究下。 htm
在介紹Qos(服務質量)前咱們先看一下使用EventingBasicConsumer的一個坑,使用代碼演示一下,簡單修改一下上邊栗子的代碼對象
生產者代碼以下,這裏生產者發送了100條消費到Brokerblog
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); #region 添加100條數據 for (int i = 0; i < 100; i++) { channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: Encoding.UTF8.GetBytes($"第{i}條消息")); } #endregion } } Console.ReadKey(); }
消費端代碼以下,消費端採用的是自動確認(autoAck=true),即Broker把消息發送給消費者就會確認成功,不關心消息有沒有處理完成,假設每條消息處理須要5s
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定義消費者 var consumer = new EventingBasicConsumer(channel); //接收到消息時執行的任務 consumer.Received += (model, ea) => { Thread.Sleep(1000 * 5); Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成"); }; Console.WriteLine("消費者準備就緒...."); //處理消息 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadKey(); #endregion } } }
咱們先執行生產者程序,執行完成後發現queue中有了100條ready狀態的消息,表示消息成功發送到了隊列
接着咱們執行消費者,消費者執行後,Broker會把消息一股腦發送過去,經過Web管理界面咱們看到queue中已經沒有消息了,以下:
咱們再看一下消費者的執行狀況,發現消費者僅僅處理了4條消息,還有96條消息沒有處理,這就是說消費者沒有處理完消息,可是queue中的消息都已經刪除了。若是這時消費者掛掉了,全部未處理的消息都會丟失,在某些場合中,丟失數據的後果是十分嚴重的。
對於上邊的問題,咱們可能會想到使用顯示確認來保證消息不會丟失:將BasicConsume方法的autoAck設置爲false,而後處理一條消息後手動確認一下,這樣的話已處理的消息在接收到確認回執時被刪除,未處理的消息以Unacked狀態存放在queue中。若是消費者掛了,Unacked狀態的消息會自動從新變成Ready狀態,如此一來就不用擔憂消息丟失了,修改消費者代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定義消費者 var consumer = new EventingBasicConsumer(channel); //接收到消息時執行的任務 consumer.Received += (model, ea) => { Thread.Sleep(1000 * 5); //處理完成,手動確認 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成"); }; Console.WriteLine("消費者準備就緒...."); //處理消息 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); #endregion } } }
從新執行生產者,而後執行消費者,Web管理其中看到結果以下:在執行消費者時,消息會一股腦的發送給消費者,而後狀態都變成Unacked,消費者執行完一條數據手動確認後,這條消息從queue中刪除。當消費者掛了(咱們能夠直接把消費者關掉來模擬掛掉的狀況),沒有處理的消息會自動從Unacked狀態變成Ready狀態,不用擔憂消息丟失了!打開Web管理界面看到狀態以下:
經過顯式確認的方式能夠解決消息丟失的問題,但這種方式也存在一些問題:①當消息有十萬,百萬條時,一股腦的把消息發送給消費者,可能會形成消費者內存爆滿;②當消息處理比較慢的時,單一的消費者處理這些消息可能很長時間,咱們天然想到再添加一個消費者加快消息的處理速度,可是這些消息都被原來的消費者接收了,狀態爲Unacked,因此這些消息不會再發送給新添加的消費者。針對這些問題怎麼去解決呢?
RabbitMQ提供的Qos(服務質量)能夠完美解決上邊的問題,使用Qos時,Broker不會再把消息一股腦的發送給消費者,咱們能夠設置每次傳輸給消費者的消息條數n,消費者把這n條消息處理完成後,再獲取n條數據進行處理,這樣就不用擔憂消息丟失、服務端內存爆滿的問題了,由於沒有發送的消息狀態都是Ready,因此當咱們新增一個消費者時,消息也能夠當即發送給新增的消費者。注意Qos只有在消費端使用顯示確認時纔有效,使用Qos的方式十分簡單,在消費端調用 channel.BasicQos() 方法便可,修改服務端代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false); #region EventingBasicConsumer //定義消費者 var consumer = new EventingBasicConsumer(channel); //接收到消息時執行的任務 consumer.Received += (model, ea) => { Thread.Sleep(1000 * 5); //處理完成,手動確認 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成"); }; Console.WriteLine("消費者準備就緒...."); //處理消息 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); #endregion } } }
清空一下queue中的消息,從新啓動生產者,而後啓動消費者,打開Web管理界面,看到狀態以下所示:
channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false) 方法中參數prefetchSize爲預取的長度,通常設置爲0便可,表示長度不限;prefetchCount表示預取的條數,即發送的最大消息條數;global表示是否在Connection中全局設置,true表示Connetion下的全部channel都設置爲這個配置。
本節演示了RabbitMQ的兩種消費者:EventingBasicConsumer和BasicGet。EventingBasicConsumer是基於長鏈接,發佈訂閱模式的消費方式,節省資源且實時性好,這是開發中最經常使用的消費模式。在一些須要消費者主動獲取消息的場合,咱們可使用Get方式,Get方式是基於短鏈接的,請求響應模式的消費方式。
Qos能夠設置消費者一次接收消息的最大條數,可以解決消息擁堵時形成的消費者內存爆滿問題。Qos也比較適用於耗時任務隊列,當任務隊列中的任務不少時,使用Qos後咱們能夠隨時添加新的消費者來提升任務的處理效率。
原文出處:https://www.cnblogs.com/wyy1234/p/10883568.html