【c#】RabbitMQ學習文檔(二)Work Queues(工做隊列)

    今天開始RabbitMQ教程的第二講,廢話很少說,直接進入話題。   (使用.NET 客戶端 進行事例演示)
     
    在第一個教程中,咱們編寫了一個從命名隊列中發送和接收消息的程序。在本教程中,咱們將建立一個工做隊列,這個隊列將用於在多個工人之間分配耗時的任務。

    工做隊列【又名:任務隊列】背後主要的思想是避免馬上執行耗時的工做任務,而且一直要等到它結束爲止。相反,咱們規劃任務並晚些執行。咱們封裝一個任務做爲消息發送到一個命名的消息隊列中,後臺運行的工做線程將獲取任務而且最終執行該任務。當你運行不少的任務的時候他們會  共享工做線程和隊列。

    這個概念在Web應用程序中是尤爲有用的,異步執行能夠在短期內處理一個複雜Http請求。

一、準備工做

    在本系列教程的前一個教程中,咱們發送了一個包含「Hello World!」的消息,如今咱們發送一個表明複雜任務的字符串。咱們不會建立一個真實的任務,好比對圖像文件進行處理或PDF文件的渲染,所以讓咱們僞裝咱們很忙-經過採用Thread.Sleep()功能來實現複雜和繁忙。咱們將根據字符串中的點的數量做爲它的複雜性,每個點將佔一秒鐘的「工做」。例如,一個假的任務描述Hello…,有三個點,咱們就須要三秒。

    咱們將稍微修改一下咱們之前的例子中Send 程序的代碼,容許從命令行發送任意消息。這個程序將把任務發送到咱們的消息隊列中,因此咱們叫它NewTask:

   像教程一,咱們須要生成兩個項目。html

複製代碼
dotnet new console --name NewTask
    mv NewTask/Program.cs NewTask/NewTask.cs
    dotnet new console --name Worker
    mv Worker/Program.cs Worker/Worker.cs
    cd NewTask
   dotnet add package RabbitMQ.Client
   dotnet restore
   cd ../Worker
   dotnet add package RabbitMQ.Client
   dotnet restore


   var message = GetMessage(args);
   var body = Encoding.UTF8.GetBytes(message);

   var properties = channel.CreateBasicProperties();
   properties.Persistent = true;

   channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);
複製代碼


   信息數據咱們能夠從命令行的參數得到:

shell

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}


    咱們的舊Receive.cs代碼也須要一些修改:須要爲消息體中每一個點都須要消耗一秒鐘的工做,先要計算出消息體內有幾個點號,而後在乘以1000,就是這個複雜消息所消耗的時間,同時表示這是一個複雜任務。RabbitMQ將處理和發送理消息,而且執行這個任務,讓咱們拷貝如下代碼黏貼到Worker的項目中,並進行相應的修改:

緩存

複製代碼
   var consumer = new EventingBasicConsumer(channel);
   consumer.Received += (model, ea) =>
   {
       var body = ea.Body;
       var message = Encoding.UTF8.GetString(body);
       Console.WriteLine(" [x] Received {0}", message);

       int dots = message.Split('.').Length - 1;
       Thread.Sleep(dots * 1000);

       Console.WriteLine(" [x] Done");
   };
   channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);
複製代碼


    咱們本身假設的任務的模擬執行時間就是:服務器

 int dots = message.Split('.').Length - 1;
 Thread.Sleep(dots * 1000);


二、輪詢調度

    咱們使用任務隊列的好處之一就是使任務能夠並行化,增長系統的並行處理能力。若是咱們正在創建一個積壓的工做,咱們能夠牢牢增長更多的Worker實例就能夠完成大量工做的處理,修改和維護就很容易。

    首先,讓咱們同時運行兩個Worker實例。他們都會從隊列中獲得消息,但具體如何?讓我想一想。

    你須要打開三個控制檯的應用程序。兩個控制檯程序將運行Wroker程序。這些控制檯程序將是咱們的兩個消費者C1和C2。異步

複製代碼
    # shell 1
    cd Worker
    dotnet run
    # => [*] Waiting for messages. To exit press CTRL+C

    # shell 2
    cd Worker
    dotnet run
    # => [*] Waiting for messages. To exit press CTRL+C
複製代碼


    在第三個控制檯應用程序中咱們將發佈新的任務。只要你已經啓動了消費者程序,你能夠看到一些發佈的信息:post

複製代碼
   # shell 3
   cd NewTask
   dotnet run "First message."
   dotnet run "Second message.."
   dotnet run "Third message..."
   dotnet run "Fourth message...."
   dotnet run "Fifth message....."
複製代碼

   讓咱們看看交付了什麼東西在Workers:

fetch

複製代碼
   # shell 1
   # => [*] Waiting for messages. To exit press CTRL+C
   # => [x] Received 'First message.'
   # => [x] Received 'Third message...'
   # => [x] Received 'Fifth message.....'

   # shell 2
   # => [*] Waiting for messages. To exit press CTRL+C
   # => [x] Received 'Second message..'
   # => [x] Received 'Fourth message....'
複製代碼


   默認狀況下,RabbitMQ將會發送每一條消息給序列中每個消費者。每一個消費者都會獲得相同數量的信息。這種分發消息的方式叫作輪詢。咱們嘗試這三個或更多的Workers。


三、消息確認

     處理一個任務可能須要幾秒鐘。若是有一個消費者開始了一個長期的任務,而且只作了一部分就發生了異常,你可能想知道到底發生了什麼。咱們目前的代碼,一旦RabbitMQ發送一個消息給客戶當即從內存中移除。在這種狀況下,若是你關掉了一個Worker,咱們將失去它正在處理的信息。咱們也將丟失發送給該特定員工但還沒有處理的全部信息。

    但咱們不想失去任何任務。若是一個Worker出現了問題,咱們但願把這個任務交給另外一個Woker。

    爲了確保消息不會丟失,RabbitMQ支持消息確認機制。ACK(nowledgement)確認消息是從【消息使用者】發送回來告訴RabbitMQ結果的一種特殊消息,確認消息告訴RabbitMQ指定的接受者已經收到、處理,而且RabbitMQ你能夠自由刪除它。

    若是一個【消費者Consumer】死亡(其通道關閉,鏈接被關閉,或TCP鏈接丟失)不會發送ACK,RabbitMQ將會知道這個消息並無徹底處理,將它從新排隊。若是有其餘用戶同時在線,它就會快速地傳遞到另外一個【消費者】。這樣你就能夠確定,沒有消息丟失,即便【Worker】偶爾死了或者出現問題。


    在沒有任何消息超時;當【消費者】死亡的時候RabbitMQ會從新發送消息。只要是正常的,即便處理消息須要很長很長的時間也會重發消息給【消費者】。

   消息確認的機制默認是打開的。在之前的例子中,咱們明確地把它們關閉設置noAck(「沒有手動確認」)參數爲true。是時候刪除這個標誌了,而且從Worker發送一個適當確認消息,一旦咱們完成了工做任務。this

複製代碼
   var consumer = new EventingBasicConsumer(channel);
   consumer.Received += (model, ea) =>
   {
       var body = ea.Body;
       var message = Encoding.UTF8.GetString(body);
       Console.WriteLine(" [x] Received {0}", message);

       int dots = message.Split('.').Length - 1;
       Thread.Sleep(dots * 1000);

       Console.WriteLine(" [x] Done");

       channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
   };
   channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
複製代碼

   使用這個代碼,咱們能夠確定的是,即便你使用Ctrl + C關掉一個正在處理消息的Worker,也不會丟失任何東西。【Worker】被殺死後,未被確認的消息很快就會被退回。


四、忘記確認

    忘記調用BasicAck這是一個常見的錯誤。雖然這是一個簡單的錯誤,但後果是嚴重的。消息會被退回時,你的客戶退出(這可能看起來像是隨機的)可是RabbitMQ將會使用更多的內存保存這些任何延遲確認消息。

    爲了調試這種錯誤,你可使用rabbitmqctl打印messages_unacknowledged字段值:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

    若是是在Window環境下,刪除掉sudo字符就能夠:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged


五、持久性的消息

    咱們已經學會了如何確保即便【消費者】死亡,任務也不會丟失。可是若是RabbitMQ服務器中止了,咱們的任務仍然會丟失的。

    當RabbitMQ退出或死機會清空隊列和消息,除非你告訴它即便宕機也不能丟失任何東西。要確保消息不會丟失,有兩件事情咱們是必須要作的:咱們須要將隊列和消息都標記爲持久的。

    首先,咱們須要確保咱們RabbitMQ歷來都不會損失咱們的的隊列。爲了作到這一點,咱們須要聲明咱們的隊列爲持久化的:spa

channel.QueueDeclare(queue: "hello",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

    雖然這個命令自己是正確的,它不會起做用在咱們目前的設置中。這是由於咱們已經定義了一個叫hello的隊列,它不是持久化的。RabbitMQ不容許你使用不一樣的參數從新定義一個已經存在的隊列,在任何程序代碼中,都試圖返回一個錯誤。但有一個快速的解決方法-讓咱們聲明一個名稱不一樣的隊列,例如task_queue:命令行

channel.QueueDeclare(queue: "task_queue",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

    這行代碼QueueDeclare表示隊列的聲明,建立並打開隊列,這個段代碼須要應用到【生產者】和【消費者】中。

    在這一點上,咱們相信,task_queue隊列不會丟失任何東西即便RabbitMQ重啓了。如今咱們要經過設置IbasicProperties.SetPersistent屬性值爲true來標記咱們的消息持久化的。

var properties = channel.CreateBasicProperties();
properties.Persistent
= true;

     關於消息持久性的注意

     將消息標記爲持久性並不能徹底保證消息不會丟失。雖然該設置告訴RabbitMQ時時刻刻把保存消息到磁盤上,可是這個時間間隔仍是有的,當RabbitMQ已經接受信息但並無保存它,此時還有可能丟失。另外,RabbitMQ不會爲每一個消息調用fsync(2)--它可能只是保存到緩存並無真正寫入到磁盤。雖然他的持久性保證不強,但它咱們簡單的任務隊列已經足夠用了。若是您須要更強的保證,那麼您可使用Publisher Comfirms。

六、公平調度

   你可能已經注意到,調度仍然沒有像咱們指望的那樣的工做。例如,在兩個Workers的狀況下,當全部的奇數消息是沉重的,甚至消息是輕的,一個Worker忙個不停,而另外一個Worker幾乎沒事可作。哎,RabbitMQ對上述狀況一無所知,仍將消息均勻發送。

   發生這種狀況是由於當有消息進入隊列的時候RabbitMQ才僅僅調度了消息。它根本不看【消費者】未確認消息的數量,它只是盲目的把第N個消息發送給第N個【消費者】。

   爲了不上述狀況的發生,咱們可使用prefetchcount = 1的設置來調用BasicQos方法。這個方法告訴RabbitMQ在同一時間不要發送多餘一個消息的數據給某個【Worker】。或者,換句話說,當某個消息處理完畢,而且已經收到了消息確認以後,才能夠繼續發送消息給那個【Worker】。相反,它將把消息分配給給下一個不忙的【Worker】。

channel.BasicQos(0, 1, false);

   注意隊列大小

   若是全部的工人都很忙,你的隊列能夠填滿。你要留意這一點,也許會增長更多的【Worker】,或者有其餘的策略。


七、把全部的代碼放在一塊兒

NewTask.cs類最終的代碼是:

複製代碼
 1 using System;
 2 using RabbitMQ.Client;
 3 using System.Text;
 4 
 5 class NewTask
 6 {
 7     public static void Main(string[] args)
 8     {
 9         var factory = new ConnectionFactory() { HostName = "localhost" };
10         using(var connection = factory.CreateConnection())
11         using(var channel = connection.CreateModel())
12         {
13             channel.QueueDeclare(queue: "task_queue",
14                                  durable: true,
15                                  exclusive: false,
16                                  autoDelete: false,
17                                  arguments: null);
18 
19             var message = GetMessage(args);
20             var body = Encoding.UTF8.GetBytes(message);
21 
22             var properties = channel.CreateBasicProperties();
23             properties.Persistent = true;
24 
25             channel.BasicPublish(exchange: "",
26                                  routingKey: "task_queue",
27                                  basicProperties: properties,
28                                  body: body);
29             Console.WriteLine(" [x] Sent {0}", message);
30         }
31 
32         Console.WriteLine(" Press [enter] to exit.");
33         Console.ReadLine();
34     }
35 
36     private static string GetMessage(string[] args)
37     {
38         return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
39     }
40 }
複製代碼

Worker.cs完整源碼以下

複製代碼
 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 using System.Threading;
 6 
 7 class Worker
 8 {
 9     public static void Main()
10     {
11         var factory = new ConnectionFactory() { HostName = "localhost" };
12         using(var connection = factory.CreateConnection())
13         using(var channel = connection.CreateModel())
14         {
15             channel.QueueDeclare(queue: "task_queue",
16                                  durable: true,
17                                  exclusive: false,
18                                  autoDelete: false,
19                                  arguments: null);
20 
21             channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
22 
23             Console.WriteLine(" [*] Waiting for messages.");
24 
25             var consumer = new EventingBasicConsumer(channel);
26             consumer.Received += (model, ea) =>
27             {
28                 var body = ea.Body;
29                 var message = Encoding.UTF8.GetString(body);
30                 Console.WriteLine(" [x] Received {0}", message);
31 
32                 int dots = message.Split('.').Length - 1;
33                 Thread.Sleep(dots * 1000);
34 
35                 Console.WriteLine(" [x] Done");
36 
37                 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
38             };
39             channel.BasicConsume(queue: "task_queue",
40                                  noAck: false,
41                                  consumer: consumer);
42 
43             Console.WriteLine(" Press [enter] to exit.");
44             Console.ReadLine();
45         }
46     }
47 }
複製代碼

 

使用消息確認和BasicQos方法能夠創建一個工做隊列。持久化的選項可讓咱們的任務隊列保持存活即便RabbitMQ重啓。

   好了,寫完了,翻譯的很差,你們見諒。

  原文地址以下:http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

   歡迎你們來探討。

相關文章
相關標籤/搜索