RabbitMQ教程C#版 - 工做隊列

先決條件
本教程假定 RabbitMQ 已經安裝,並運行在localhost標準端口(5672)。若是你使用不一樣的主機、端口或證書,則須要調整鏈接設置。javascript

從哪裏得到幫助
若是您在閱讀本教程時遇到困難,能夠經過郵件列表 聯繫咱們css

工做隊列#

(使用 .NET Client)html

在 教程[1] 中,咱們編寫了兩個程序,用於從一個指定的隊列發送和接收消息。在本文中,咱們將建立一個工做隊列,用於在多個工做線程間分發耗時的任務。java

工做隊列(又名:任務隊列)背後的主要想法是避免當即執行資源密集型、且必須等待其完成的任務。相反的,咱們把這些任務安排在稍後完成。咱們能夠將任務封裝爲消息並把它發送到隊列中,在後臺運行的工做進程將從隊列中取出任務並最終執行。當您運行多個工做線程,這些任務將在這些工做線程之間共享。nginx

這個概念在Web應用程序中特別有用,由於在一個 HTTP 請求窗口中沒法處理複雜的任務。git

準備#

咱們將略微修改上一個示例中的Send程序,以其能夠在命令行發送任意消息。
這個程序將調度任務到咱們的工做隊列中,因此讓咱們把它命名爲NewTaskgithub

像 教程[1]同樣,咱們須要生成兩個項目:sql

Copy
dotnet new console --name NewTask mv NewTask/Program.cs NewTask/NewTask.cs dotnet new console --name Worker mv Worker/Program.cs Worker/Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../Worker dotnet add package RabbitMQ.Client dotnet restore
Copy
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);

從命令行參數獲取消息的幫助方法:docker

Copy
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }

咱們舊的Receive.cs腳本也須要進行一些更改:它須要爲消息體中的每一個點模擬一秒種的時間消耗。它將處理由 RabbitMQ 發佈的消息,並執行任務,所以咱們把它複製到Worker項目並修改:shell

Copy
// 構建消費者實例。 var consumer = new EventingBasicConsumer(channel); // 綁定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模擬耗時操做。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);

模擬虛擬任務的執行時間:

Copy
int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);

循環調度#

使用任務隊列的優勢之一是可以輕鬆地並行工做。若是咱們正在積累積壓的工做,咱們僅要增長更多的工做者,並以此方式能夠輕鬆擴展。

首先,咱們嘗試同時運行兩個Worker實例。他們都會從隊列中獲取消息,但究竟如何?讓咱們來看看。

您須要打開三個控制檯,兩個運行Worker程序,這些控制檯做爲咱們的兩個消費者 - C1和C2。

Copy
# shell 1 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C
Copy
# shell 2 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C

在第三個控制檯中,咱們將發佈一些新的任務。一旦你已經運行了消費者,你能夠嘗試發佈幾條消息:

Copy
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."

讓咱們看看有什麼發送到了咱們的Worker程序:

Copy
# shell 1 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
Copy
# shell 2 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'

默認狀況下,RabbitMQ 會按順序將每條消息發送給下一個消費者。消費者數量平均的狀況下,每一個消費者將會得到相同數量的消息。這種分配消息的方式稱爲循環(Round-Robin)。請嘗試開啓三個或更多的Worker程序來驗證。

消息確認#

處理一項任務可能會須要幾秒鐘的時間。若是其中一個消費者開啓了一項長期的任務而且只完成了部分就掛掉了,您可能想知道會發生什麼?在咱們當前的代碼中,一旦 RabbitMQ 把消息分發給了消費者,它會當即將這條消息標記爲刪除。在這種狀況下,若是您停掉某一個 Worker,咱們將會丟失這條正在處理的消息,也將丟失全部分發到該 Worker 但還沒有處理的消息。

可是咱們不想丟失任何一個任務。若是一個 Worker 掛掉了,咱們但願這個任務能被從新分發給其餘 Worker。

爲了確保消息永遠不會丟失,RabbitMQ 支持 消息確認 機制。消費者回發一個確認信號 Ack(nowledgement) 給 RabbitMQ,告訴它某個消息已經被接收、處理而且能夠自由刪除它。

若是一個消費者在尚未回發確認信號以前就掛了(其通道關閉,鏈接關閉或者 TCP 鏈接丟失),RabbitMQ 會認爲該消息未被徹底處理,並將其從新排隊。若是有其餘消費者同時在線,該消息將會被會迅速從新分發給其餘消費者。這樣,即使 Worker 意外掛掉,也能夠確保消息不會丟失。

沒有任何消息會超時;當消費者死亡時,RabbitMQ 將會從新分發消息。即便處理消息須要很是很是長的時間也不要緊。

默認狀況下,手動消息確認 模式是開啓的。在前面的例子中,咱們經過將autoAck(「自動確認模式」)參數設置爲true來明確地關閉手動消息確認模式。一旦完成任務,是時候刪除這個標誌而且從 Worker 手動發送一個恰當的確認信號給RabbitMQ。

Copy
// 構建消費者實例。 var consumer = new EventingBasicConsumer(channel); // 綁定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模擬耗時操做。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); // 手動發送消息確認信號。 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // autoAck:false - 關閉自動消息確認,調用`BasicAck`方法進行手動消息確認。 // autoAck:true - 開啓自動消息確認,當消費者接收到消息後就自動發送 ack 信號,不管消息是否正確處理完畢。 channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

使用上面這段代碼,咱們能夠肯定的是,即便一個 Worker 在處理消息時,咱們經過使用CTRL + C來終止它,也不會丟失任何消息。Worker 掛掉不久,全部未確認的消息將會被從新分發。

忘記確認
遺漏BasicAck是一個常見的錯誤。這是一個很簡單的錯誤,但致使的後果倒是嚴重的。當客戶端退出時(看起來像是隨機分發的),消息將會被從新分發,可是RabbitMQ會吃掉愈來愈多的內存,由於它不能釋放未確認的消息。
爲了調試這種錯誤,您可使用rabbitmqctl來打印messages_unacknowledged字段:

Copy
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,刪除sudo

Copy
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化#

咱們已經學習瞭如何確保即便消費者掛掉,任務也不會丟失。可是若是 RabbitMQ 服務器中止,咱們的任務仍是會丟失。

當 RabbitMQ 退出或崩潰時,它會忘記已存在的隊列和消息,除非告訴它不要這樣作。爲了確保消息不會丟失,有兩件事是必須的:咱們須要將隊列和消息標記爲持久

首先,咱們須要確保 RabbitMQ 永遠不會丟失咱們的隊列。爲了作到這一點,咱們須要把隊列聲明是持久的(Durable)

Copy
// 聲明隊列,經過指定 durable 參數爲 true,對消息進行持久化處理。 channel.QueueDeclare(queue: "hello",  durable: true,  exclusive: false,  autoDelete: false,  arguments: null);

雖然這個命令自己是正確的,可是它在當前設置中不會起做用。那是由於咱們已經定義過一個名爲hello的隊列,而且這個隊列不是持久化的。RabbitMQ 不容許使用不一樣的參數從新定義已經存在的隊列,並會向嘗試執行該操做的程序返回一個錯誤。但有一個快速的解決辦法 - 讓咱們用不一樣的名稱聲明一個隊列,例如task_queue

Copy
channel.QueueDeclare(queue: "task_queue",  durable: true,  exclusive: false,  autoDelete: false,  arguments: null);

注意,該聲明隊列QueueDeclare方法的更改須要同時應用於生產者和消費者代碼。

此時,咱們能夠肯定的是,即便 RabbitMQ 從新啓動,task_queue隊列也不會丟失。如今咱們須要將咱們的消息標記爲持久的(Persistent) - 經過將IBasicProperties.Persistent設置爲true

Copy
// 將消息標記爲持久性。 var properties = channel.CreateBasicProperties(); properties.Persistent = true;

關於消息持久性的說明
將消息標記爲Persistent並不能徹底保證消息不會丟失。儘管它告訴 RabbitMQ 將消息保存到磁盤,但當 RabbitMQ 接收到消息而且還沒有保存消息時仍有一段時間間隔。此外,RabbitMQ 不會爲每條消息執行fsync(2) - 它可能只是保存到緩存中,並無真正寫入磁盤。消息的持久化保證並不健壯,但對於簡單的任務隊列來講已經足夠了。若是您須要一個更加健壯的保證,可使用 發佈者確認

公平調度#

您可能已經注意到調度仍然沒法徹底按照咱們指望的方式工做。例如,在有兩個 Worker 的狀況下,假設全部奇數消息都很龐大、偶數消息都很輕量,那麼一個 Worker 將會一直忙碌,而另外一個 Worker 幾乎不作任何工做。是的,RabbitMQ 並不知道存在這種狀況,它仍然會平均地分發消息。

發生這種狀況是由於 RabbitMQ 只是在消息進入隊列後就將其分發。它不會去檢查每一個消費者所擁有的未確認消息的數量。它只是盲目地將第 n 條消息分發給第 n 位消費者。

爲了改變上述這種行爲,咱們可使用參數設置prefetchCount = 1basicQos方法。

這就告訴 RabbitMQ 同一時間不要給一個 Worker 發送多條消息。或者換句話說,不要向一個 Worker 發送新的消息,直到它處理並確認了前一個消息。
相反,它會這個消息調度給下一個不忙碌的 Worker。

Copy
channel.BasicQos(0, 1, false);

關於隊列大小的說明
若是全部的 Worker 都很忙,您的隊列可能會被填滿。請留意這一點,能夠嘗試添加更多的 Worker,或者使用其餘策略。

組合在一塊兒#

咱們NewTask.cs類的最終代碼:

Copy
using System; using RabbitMQ.Client; using System.Text; class NewTask { public static void Main(string[] args) { // 實例化鏈接工廠。 var factory = new ConnectionFactory() { HostName = "localhost" }; // 建立鏈接、信道。 using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { // 聲明隊列,標記爲持久性。 channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // 獲取發送消息。 var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); // 將消息標記爲持久性。 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 發送數據包。 channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } }

(NewTask.cs 源碼)

還有咱們的Worker.cs

Copy
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading; class Worker { public static void Main() { // 實例化鏈接工廠。 var factory = new ConnectionFactory() { HostName = "localhost" }; // 建立鏈接、信道。 using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { // 聲明隊列,標記爲持久性。 channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // 告知 RabbitMQ,在未收到當前 Worker 的消息確認信號時,再也不分發給消息,確保公平調度。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); // 構建消費者實例。 var consumer = new EventingBasicConsumer(channel); // 綁定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模擬耗時操做。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); // 手動發送消息確認信號。 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }

(Worker.cs 源碼)

使用消息確認機制和BasicQ您能夠建立一個工做隊列。即便 RabbitMQ 從新啓動,經過持久性選項也可以讓任務繼續存在。

有關IModel方法和IBasicProperties的更多信息,您能夠在線瀏覽 RabbitMQ .NET客戶端API參考

如今,咱們能夠繼續閱讀 教程[3],學習如何向多個消費者發送相同的消息。

寫在最後#

本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容爲準。水平有限,翻譯的很差請見諒,若有翻譯錯誤還請指正。

做者:Esofar

出處:https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html

本站使用「CC BY 4.0」創做共享協議,轉載請在文章明顯位置註明做者及出處。

相關文章
相關標籤/搜索