目錄:html
輪詢調度(Round-robin dispatching):即依次分配分配任務給worker。shell
消息答覆(Message acknowledgement):在consumer處理完以後,進行消息答覆。避免殺掉worker後,message消息。併發
消息持久化(Message durability):在RabbitMQ server中止後,確保message不會丟失。須要持久化queue和message函數
公平調度(Fair dispatch):爲了使worker不會出現有的一直在busy,而有的一致很閒的狀態。使用的是 channel.BasicQos(0, 1, false) ,確保worker確認完成上一個任務後,纔會分配下一個。fetch
代碼spa
在第一個教程中,咱們講了在一個指定的queue中發送和接收message. 下面咱們講一個用於在多個worker之間分配費時任務的工做隊列(Work Queue).code
Work Queue的主要思想就是避免當即作一個資源集中型任務而且還必須等待它完成。server
咱們把任務封裝成一個message,而且發送到隊列。這裏面的worker實際就是consumer,以後會由它們執行這些任務。htm
咱們會統計字符串中的 . 來使程序sleep。即便用Thread.Sleep()。例如Hello...會花費3秒。blog
在這裏咱們的producer叫作NewTask。而咱們的consumer叫作worker。
它們能夠在上一節的基礎上作一些修改獲得
發送消息代碼修改
var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
接收消息代碼修改
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
這個裏面的消息依然是自動答覆的
上面的修改是爲了模擬真實耗時任務。
使用隊列任務的一個好處就是能很容易的進行併發任務。
首先,咱們嘗試同時運行兩個worker。它們能夠同時從隊列中取到message。那麼具體是怎樣呢?
你須要打開三個控制檯程序,兩個運行worker程序。
# shell 1 cd Worker dotnet run# => [*] Waiting for messages. To exit press CTRL+C
# shell 2 cd Worker dotnet run# => [*] Waiting for messages. To exit press CTRL+C
第三個用來發布new tasks.
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."
咱們看下兩個worker中的結果:
默認狀況下,RabbitMQ會輪詢發送每一個message。因此,平均來講,每一個consumer會獲得相同數量的messages . 這種分發message的方式叫作輪詢。
同時,注意到,queue中的message只能發到一個worker裏,即兩個worker裏的task不會重複,即這是一種點對點的方式。
你想下,若是一個consumer正在進行一個長任務(long task),而且就完成了一部分就死掉了。那麼會發生什麼呢?在咱們當前的代碼裏,一旦RabbitMQ發送了一個message到一個consumer,那麼RabbitMQ裏的message馬上就會被標記爲刪除(deletion)。 在這種狀況下,若是你殺死一個worker,咱們將會丟失這個worker正在處理的message,咱們也會丟失全部已經分配到這個worker但還沒處理的messages。
注意:默認狀況下,並非會等每一個task在consumer中執行完纔會分發下一個message,也有多是一下分發好多條。具體能夠經過設置。
可是,咱們不想丟失任務tasks。若是一個worker死掉了,咱們想要task會被髮送到另外一個worker。
爲了message再也不丟失,RabbitMQ引入了message acknowledge。一個ack 會在一個message被接收,處理後被consumer發送回來,而且RabbitMQ把它標記爲刪除。
若是一個consumer還沒發送一個ack就死掉了。RabbitMQ會認爲它沒被徹底處理,而且re-queue 它。若是線上同時還有其餘的consumer,那麼RabbitMQ會很快的把它發送到另外一個consumer。這樣即便worker忽然死了,也沒有message會丟失了。
消息過期是不存在的。RabbitMQ將重發這個message,當consumer死掉時。即便在處理message時花費很長時間,也沒有關係(由於不存在過期)
Manual message acknowledgment(手動消息答覆) 默認是開啓的。在前一個例子中,咱們經過設置autoAck爲true把它關閉了。
如今,咱們把手動消息答覆打開(即autoAck設置爲false),而且,一旦咱們作完了一個task,咱們就發送一個確認(a acknowledgment).
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //消息答覆 }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);//設置手動消息答覆開啓
這樣,即便咱們殺死了一個worker,咱們的message也不會丟失了。
Acknowledgement必須和接收message的通道是同一個。不然會報 channel-level protocol exception。
那麼,若是咱們忘記發acknowledgement會怎麼樣呢?
忘記BasicAck是一個常發生的錯誤,可是後果卻很嚴重。當你的client退出後,messages也會被重發。可是RabbitMQ會吃掉(消耗)愈來愈多的內存,隨着它沒法釋聽任何unacked messages.
你能夠經過message_unacknowledged打印出沒確認的message
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows上
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱們已經學了當consumer被殺死時,使task不丟失。可是若是咱們的RabbitMQ server中止了,咱們的task依然會丟失。
想要server中止時,messages不丟失,須要標記queue和message是持久化的(durable)。
首先,咱們標記queue是durable
channel.QueueDeclare(queue: "hello", durable: true, //標記queue爲durable exclusive: false, autoDelete: false, arguments: null);
雖然上面的代碼自己是正確的,可是在目前卻不會生效。由於咱們以前已經定義過了一個hello的queue,它是not durable。RabbitMQ不會容許你使用不一樣的參數從新定義一個已經存在的queue,而且會報錯。
這裏,咱們先直接聲明一個不一樣名稱的queue。以下
channel.QueueDeclare(queue: "task_queue", durable: true, //標記queue爲durable exclusive: false, autoDelete: false, arguments: null);
其中,QueueDeclare須要被應用到producer和consumer的代碼裏。
如今,咱們標記messages爲persistent(永恆的)。經過設置IBasicProperties.SetPersistent爲true.
var properties = channel.CreateBasicProperties(); properties.Persistent = true; //設置message是persistent
你可能已經注意到,上面的調度仍然不能按咱們想要的工做。它可能出現兩個worker一個一直很忙,一個一直很閒(任務執行時間不同)。
這是由於RabbitMQ會被分發,當message輸入一個queue。它不會看一個consumer未完成的queue , 它僅僅盲目的分發第幾個到第幾個consumer.
爲了改變行爲,咱們可使用BasicQos,而且prefetchCount=1。這個會告訴RabbityMQ每次給只會給worker一個message。或者說,RabbitRQ在worker處理而且確認以前不會分發一個新的message。也能夠說,RabbitMQ會分發給下一個不忙的worker。
channel.BasicQos(0, 1, false);
注意queue的大小
若是你的全部worker都是busy的,說明你的queue已經滿了。你應該對此保持關注,而且或者你能夠增長更多的worker或者有一些其餘策略。
NewTask.cs
using System;using RabbitMQ.Client;using System.Text; class NewTask { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) //建立鏈接 using(var channel = connection.CreateModel()) //建立channel { channel.QueueDeclare(queue: "task_queue", //聲明一個durable的queue durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); //取得message var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); //設置message是persistent properties.Persistent = true; channel.BasicPublish(exchange: "", //發送 routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } }
Worker.cs
using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;using System.Threading; class Worker { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) //創建鏈接 using(var channel = connection.CreateModel()) //創建通道(channel) { channel.QueueDeclare(queue: "task_queue", //聲明queue是durable durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //設置公平的調度策略(fair dispatch) Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); //回調函數 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; //模擬真實業務花費一些時間 Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //消息答覆 }; channel.BasicConsume(queue: "task_queue", //發送而且設置手動消息答覆開啓 autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
參考網址:RabbitMQ