在第一篇咱們寫了兩個程序經過一個命名的隊列分別發送和接收消息。在這一篇,咱們將建立一個工做隊列在多個工做線程間分發耗時的工做任務。html
工做隊列的核心思想是避免馬上處理資源密集型任務致使必須等待其執行完成。相反的,咱們安排這些任務在稍晚的時間完成。咱們將一個任務封裝爲一個消息並把它發送到隊列中。一個後臺的工做線程將從隊列中取出任務並最終執行。當你運行多個工做線程,這些任務將在這些工做線程間共享。shell
這個概念對於在一個HTTP請求中處理複雜任務的Web應用尤爲有用。函數
在前一篇中,咱們發送了一條內容爲「Hello World!」的消息。如今,咱們將要發送一些表明複雜任務的字符串。咱們並無諸如改變圖片大小或者渲染PDF文件這樣的真實的任務,因此假設任務會致使系統的繁忙--經過使用Threed.Sleep()函數。咱們會採用許多的點(.)在字符串中來表達他的複雜性,每個點將消耗一秒鐘的工做時間。例如,假設有一個任務「Hello...」將消耗3秒鐘。學習
咱們會把上一個例子中的Send.cs文件中的代碼稍微調整一下,使得對任意的消息都能經過命令行發送。這個程序將調度任務到咱們的工做隊列中,因此讓咱們將它命名爲NewTask.cs:fetch
1 var message = GetMessage(args); 2 var body = Encoding.UTF8.GetBytes(message); 3 4 var properties = channel.CreateBasicProperties(); 5 properties.SetPersistent(true); 6 7 channel.BasicPublish(exchange: "", 8 routingKey: "task_queue", 9 basicProperties: properties, 10 body: body);
獲取命令行消息的幫助方法:spa
1 private static string GetMessage(string[] args) 2 { 3 return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); 4 }
舊有的Receive.cs代碼一樣須要稍做修改:須要一個爲消息中每個點模擬一秒的時間消耗。它將會處理RabbitMQ發佈的消息,執行任務,因此咱們稱之爲Worker.cs。命令行
1 var consumer = new EventingBasicConsumer(channel); 2 consumer.Received += (model, ea) => 3 { 4 var body = ea.Body; 5 var message = Encoding.UTF8.GetString(body); 6 Console.WriteLine(" [x] Received {0}", message); 7 8 int dots = message.Split('.').Length - 1; 9 Thread.Sleep(dots * 1000); 10 11 Console.WriteLine(" [x] Done"); 12 13 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 14 }; 15 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
模擬虛擬任務的執行時間:線程
1 int dots = message.Split('.').Length - 1; 2 Thread.Sleep(dots * 1000);
像第一篇中那樣編譯程序:調試
1 $ csc /r:"RabbitMQ.Client.dll" NewTask.cs 2 $ csc /r:"RabbitMQ.Client.dll" Worker.cs
使用工做隊列的好處之一是可以很輕鬆的並行任務。若是咱們要增強對積壓工做的處理,只須要按照上面的方法添加更多的Worker,很是容易擴展。code
首先,咱們同時運行兩個Worker。它們都會從隊列中獲取消息,可是到底是怎樣作到的呢?讓咱們看看。
你須要打開三個控制檯。兩個運行Worker,這兩個控制檯程序將充當消費者--C1與C2。
1 shell1$ Worker.exe 2 Worker 3 [*] Waiting for messages. To exit press CTRL+C
1 shell2$ Worker.exe 2 Worker 3 [*] Waiting for messages. To exit press CTRL+C
在第三個控制檯程序中,咱們將發佈一些新的任務。一旦你已經運行了消費者,你就能夠發佈新消息了:
1 shell3$ NewTask.exe First message. 2 shell3$ NewTask.exe Second message.. 3 shell3$ NewTask.exe Third message... 4 shell3$ NewTask.exe Fourth message.... 5 shell3$ NewTask.exe Fifth message.....
讓咱們看看有什麼發送到了Worker端:
1 shell1$ Worker.exe 2 [*] Waiting for messages. To exit press CTRL+C 3 [x] Received 'First message.' 4 [x] Received 'Third message...' 5 [x] Received 'Fifth message.....'
1 shell2$ Worker.exe 2 [*] Waiting for messages. To exit press CTRL+C 3 [x] Received 'Second message..' 4 [x] Received 'Fourth message....'
默認狀況下,RabbitMQ會按順序將消息逐個發送到消費者。平均狀況下,每個消費者將會得到相同數量的消息。這種分發消息的方式成爲輪轉調度。可使用三個以上的Worker試一試。
處理一個任務可能花費數秒鐘。你可能會擔憂消費者開始一個較長的任務,可是在完成部分以後就出錯了。在咱們如今的代碼中,一旦RabbitMQ分發了一條消息給消費者它就會立刻在隊列中刪除這條消息。在這樣的狀況下,若是你停止某一個Worker,由於消息正在執行中,咱們將丟失該消息。咱們也將丟失全部分發到該Worker可是未被處理的消息。
可是咱們不想丟失任何一個任務。若是一個Worker停止了,咱們但願這個任務能被分發給其餘Worker。
爲了確保消息毫不丟失,RabbitMQ提供了消息確認機制。消費者回發一個確認給RabbitMQ,告知某個消息已經被接收、處理,而後RabbitMQ就能夠爲所欲爲的刪除它了。
若是一個消費者在沒有回發確認就停止了,RabbitMQ會認爲該消息沒有被徹底的處理,並會將該消息從新分發給其餘的消費者。經過這種方式,你能夠肯定沒有消息會丟失,即便有Worker會不可意料的停止。
沒有消息會超時,RabbitMQ僅僅會在Worker的鏈接停止的時候從新分發消息。即便處理一個消息花費的時間很長很長也不會有什麼關係。
消息確認在默認狀況下是開啓的。在前面的示例中咱們經過將noAck參數設置爲true顯示的關閉了消息確認。如今是時候移除該標記了,使完成一個任務時發回一個恰當的確認。
1 var consumer = new EventingBasicConsumer(channel); 2 consumer.Received += (model, ea) => 3 { 4 var body = ea.Body; 5 var message = Encoding.UTF8.GetString(body); 6 Console.WriteLine(" [x] Received {0}", message); 7 8 int dots = message.Split('.').Length - 1; 9 Thread.Sleep(dots * 1000); 10 11 Console.WriteLine(" [x] Done"); 12 13 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 14 }; 15 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
使用這段代碼,咱們能夠確保及時在消費者正在執行時你用CTRL+C強制中斷了程序,也不會丟失任何消息。在消費者停止後不久,全部爲收到確認的消息都將被從新分發。
被遺忘的確認
缺乏BasicAck是一個很是常見的錯誤。這是一個簡單的錯誤,可是後果卻至關嚴重。當客戶端退出的時候,消息會被從新分發(看起來像是隨機分發的),可是RabbitMQ會佔用愈來愈多的內存由於它不能釋放未確認的消息。
爲了調試這種錯誤,你可使用rabbitmqctl打印messages_unacknowledged字段:
1 $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 2 Listing queues ... 3 hello 0 0 4 ...done.
咱們已經學習瞭如何確保即便消費者停止,任務也不會丟失。可是若是RabbitMQ服務停止的時候,咱們的任務仍是會丟失。
當RabbitMQ退出或者崩潰,它會忘記存在的隊列和隊列中的消息,除非你告訴它不要這樣。確保消息不丟失,有兩件事情是必須的:咱們必須同時把隊列和消息標記爲持久的(durable)。
首先,咱們須要確保RabbitMQ永遠不會丟失隊列。爲了作到這件事,咱們須要將隊列申明爲持久的:
1 channel.QueueDeclare(queue: "hello", 2 durable: true, 3 exclusive: false, 4 autoDelete: false, 5 arguments: null);
儘管此命令自己是正確的,可是在當前設置下它不會起做用。由於咱們已經定義過一個叫作hello的隊列。RabbitMQ不容許使用不一樣的參數重定義一個已經存在的隊列,任未嘗試作這樣的事情的程序都將返回一個錯誤。可是有一個變通的方法--讓咱們用不一樣的名稱申明一個隊列,例如task_queue:
1 channel.QueueDeclare(queue: "task_queue", 2 durable: true, 3 exclusive: false, 4 autoDelete: false, 5 arguments: null);
這個隊列申明的改變須要被應用於生產者和消費者。
這個時候,咱們能夠肯定即便是RabbitMQ重啓了,task_queue也不會丟失。如今咱們須要把咱們的消息標記爲持久的(persistent),經過把IBasicProperties.SetPersistent設置爲true。
1 var properties = channel.CreateBasicProperties(); 2 properties.SetPersistent(true);
消息持久註記
將消息標記爲持久的並不能徹底的保證消息不會丟失。儘管告知了RabbitMQ將消息保存在磁盤上,仍舊有很短的時間裏RabbitMQ接收到一個消息而且尚未保存。因此RabbitMQ不會對每條消息作fsync--它可能僅僅被存放在Cache中而不是實際寫入到磁盤裏面。消息的持久化保證並不健壯,可是對於簡單的任務隊列已經足夠。若是你須要一個更加健壯的保證,你可使用發佈者確認。
你可能已經注意到調度依舊不能徹底按照咱們指望的方式工做。設想一個有兩個Worker的應用場景,當全部奇數消息都很龐大而偶數消息很輕量的時候,一個Worker老是很是的繁忙而另外一個幾乎不作什麼事情。嗯,RabbitMQ並不會知道這事兒,它依然會平均的分發消息。
出現這種狀況是由於RabbitMQ只是在消息進入隊列後就將其分發。它並不會去檢查每一個消費者所擁有的未肯定消息的數量。它只是不假思索的將第N個消息調度到第N個消費者。
爲了應對這種狀況,咱們可使用basicQos方法而且把參數prefetchCount設置爲1。這將告訴RabbitMQ不要同一時間調度給同一個消費者超過一條消息。或者,當一個消費者正在處理或確認前一個消息時不要將新消息調度給它。相反的,它會把這個消息調度給下一個不忙碌的消費者。
1 channel.BasicQos(0, 1, false);
隊列大小注記
若是全部的消費者都很忙碌,你的隊列可能被填滿。你但願能盯着這個問題,而且添加更多的消費者,或者使用其餘策略。
NewTask.cs類的最終代碼以下:
1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class NewTask 6 { 7 public static void Main(string[] args) 8 { 9 var factory = new ConnectionFactory() { HostName = "localhost" }; 10 using(var connection = factory.CreateConnection()) 11 using(var channel = connection.CreateModel()) 12 { 13 channel.QueueDeclare(queue: "task_queue", 14 durable: true, 15 exclusive: false, 16 autoDelete: false, 17 arguments: null); 18 19 var message = GetMessage(args); 20 var body = Encoding.UTF8.GetBytes(message); 21 22 var properties = channel.CreateBasicProperties(); 23 properties.SetPersistent(true); 24 25 channel.BasicPublish(exchange: "", 26 routingKey: "task_queue", 27 basicProperties: properties, 28 body: body); 29 Console.WriteLine(" [x] Sent {0}", message); 30 } 31 32 Console.WriteLine(" Press [enter] to exit."); 33 Console.ReadLine(); 34 } 35 36 private static string GetMessage(string[] args) 37 { 38 return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); 39 } 40 }
Worker.cs類的最終代碼以下:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 using System.Threading; 6 7 class Worker 8 { 9 public static void Main() 10 { 11 var factory = new ConnectionFactory() { HostName = "localhost" }; 12 using(var connection = factory.CreateConnection()) 13 using(var channel = connection.CreateModel()) 14 { 15 channel.QueueDeclare(queue: "task_queue", 16 durable: true, 17 exclusive: false, 18 autoDelete: false, 19 arguments: null); 20 21 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 22 23 Console.WriteLine(" [*] Waiting for messages."); 24 25 var consumer = new EventingBasicConsumer(channel); 26 consumer.Received += (model, ea) => 27 { 28 var body = ea.Body; 29 var message = Encoding.UTF8.GetString(body); 30 Console.WriteLine(" [x] Received {0}", message); 31 32 int dots = message.Split('.').Length - 1; 33 Thread.Sleep(dots * 1000); 34 35 Console.WriteLine(" [x] Done"); 36 37 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 38 }; 39 channel.BasicConsume(queue: "task_queue", 40 noAck: false, 41 consumer: consumer); 42 43 Console.WriteLine(" Press [enter] to exit."); 44 Console.ReadLine(); 45 } 46 } 47 }
你可使用消息肯定和BasicQos設置一個工做隊列。持久化選項使得消息RabbitMQ重啓的時候也得以保全。
要了解關於IModel和IBasicProperties的更多信息,你能夠瀏覽在線的RabbitMQ .NET客戶端API引用。
如今咱們能夠前進道教程三並瞭解如何向多個消費者發送相同的消息。
原文連接:http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html