原文來自 RabbitMQ 英文官網的教程(2.Work Queues),其示例代碼採用了 .NET C# 語言。html
In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.git
在第一篇教程中,咱們編寫了程序從一個具名(已明確命名的)隊列中發送和接收消息。在這一篇中,咱們會在多個工做單元之間建立一個工做隊列來分配耗時的任務。github
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.web
工做隊列(又稱:任務隊列)背後的主旨是爲了不當即執行一項資源密集型任務,從而致使不得不等待它的完成。相反,咱們安排任務稍後再完成。咱們將任務封裝成一個消息併發送給隊列,由一個後臺工做進程負責彈出該任務並最終執行這項工做。若是有多項工做單元在同時運行,這些任務會在它們之間平均分配。shell
This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.緩存
上述這一律念在 Web 應用程序中尤爲有用,由於在一個簡短的 HTTP 請求視窗中幾乎不可能處理一項複雜任務。併發
準備事宜
In the previous part of this tutorial we sent a message containing "Hello World!". Now we'll be sending strings that stand for complex tasks. We don't have a real-world task, like images to be resized or pdf files to be rendered, so let's fake it by just pretending we're busy - by using the Thread.Sleep() function (you will need to add using System.Threading; near the top of the file to get access to the threading APIs). We'll take the number of dots in the string as its complexity; every dot will account for one second of "work". For example, a fake task described by Hello... will take three seconds.app
在本教程的以前部分,咱們發送了一個包含"Hello World!"的消息。如今,咱們將要發送一個表明複雜任務的字符串。咱們並無一個現實世界的任務,諸如圖片尺寸修整,或者 pdf 文件的渲染,因此讓咱們經過僞造忙碌來模擬它,使用 Thread.Sleep() 函數便可(你須要在文件的頂部追加 using System.Threading 命名空間)。咱們會採起點的字符數量來表達複雜性,每個點將代表一秒鐘的「工做」,好比模擬任務被描述爲「Hello...」時就表示運行了 3 秒鐘。less
We will slightly modify the Send program from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let's name it NewTask:dom
咱們會在以前的示例中輕微地修改發送程序,以容許任意的消息能夠從命令行中被髮送。該程序會安排任務至工做隊列,因此讓咱們給它命名爲 NewTask 吧:
Like tutorial one we need to generate two projects.
如同第一篇教程,咱們須要生成兩個工程項目。
dotnet new console --name NewTask mv NewTask/Program.cs NewTask/NewTask.cs dotnet new console --name Worker mv Worker/Program.cs Worker/Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../Worker dotnet add package RabbitMQ.Client dotnet restore
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
Some help to get the message from the command line argument:
一些從命令行參數獲取消息的幫助類:
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
Our old Receive.cs script also requires some changes: it needs to fake a second of work for every dot in the message body. It will handle messages delivered by RabbitMQ and perform the task, so let's copy it to the Worker project and modify:
咱們在舊的 Receive.cs 代碼中也須要作一些改變:它須要在消息體中針對每個點模擬一秒鐘刻度的工做,同時會處理經由 RabbitMQ 遞送過來的消息以及運行任務,因此讓咱們先把代碼複製到工程項目,並作一些修改:
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
Our fake task to simulate execution time:
咱們開始模擬一下仿真執行時間:
int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);
循環分發
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
使用任務隊列的其中一項優點就是能夠很容易地開展並行工做。若是(隊列中)出現大量地工做積壓,咱們能夠經過該途徑增添更多的工做單元,因此擴展規模很方便。
First, let's try to run two Worker instances at the same time. They will both get messages from the queue, but how exactly? Let's see.
首先,讓咱們嘗試同時運行兩個工做實例。他們將同時從隊列中獲取消息,不過結果將會如何呢?一塊兒拭目以待吧。
You need three consoles open. Two will run the Worker program. These consoles will be our two consumers - C1 and C2.
你須要開啓 3 個控件臺,其中兩個將用來運行工做程序,它們將成爲消費者 - C1 和 C2。
# shell 1 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C
In the third one we'll publish new tasks. Once you've started the consumers you can publish a few messages:
在第 3 個控制檯,咱們將發佈一個新任務。你一旦啓動消費者程序,新任務就能夠發佈一些信息了:
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."
Let's see what is delivered to our workers:
讓咱們一塊兒看看到底遞送了什麼消息給工做單元:
# shell 1 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
# shell 2 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
默認狀況下,RabbitMQ 會依次給下一個消費者逐個發送消息。平均算下來,每個消費者將得到相同數量的消息,這種分發消息的方式就被稱做循環。好了,試一試開啓 3 個或更多的工做單元吧。
消息確認
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
執行一項任務會用去一些時間,若是其中一個消費者啓動了一個長任務而且執行到一部分就停止,你可能想知道究竟發生了什麼。鑑於咱們當前的代碼,一旦 RabbitMQ 遞送消息給消費者,它會當即將消息標記爲刪除。這樣的話,若是你停止一個工做單元你將會失去正在處理中的消息。同時,咱們也會失去全部已分發到當前指定工做單元中還沒來得及處理的消息。
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
可是,咱們並不但願失去任何任務,若是一個工做單元停止了,咱們但願這個任務會被遞送給另外一個工做單元。
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
爲了確保一個消息永遠不會丟失,RabbitMQ 提供了消息確認。消費者將回發一個 ack 標識來告知 RabbitMQ 指定的消息已被接收和處理,而後就能夠放心的刪除它。
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
若是一個消費者停止了(信道被關閉、鏈接被關閉,或者是 TCP 鏈接丟失),致使沒有發送 ack 標識,RabbitMQ 將領會到該消息沒有被徹底處理,隨後將對其從新分配。若是有其餘的消息者同時在線,RabbitMQ 會迅速的從新遞送該任務給另外一個消息者。經過該方式,你就能夠確信沒有消息會被遺漏,即便這些工做單元偶然地停止。
There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.
沒有任何消息會出現超時,RabbitMQ 會在消費者停止時從新遞送消息。即便運行一個消息會花去很是很是長的時間,它仍然能夠運行良好。
Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off by setting the autoAck ("automatic acknowledgement mode") parameter to true. It's time to remove this flag and manually send a proper acknowledgment from the worker, once we're done with a task.
默認狀況下,手動的消息確認是打開的。在以前的例子裏,咱們經過設置「自動確認」爲 true 值來顯式的關閉了手動機制。如今是時候刪除這個(自動)標記了,一旦咱們的工做單元完成一個任務的時候,就手動地從工做單元發送一個恰當的確認。
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.
使用這段代碼後,咱們能夠確信即便你使用 CTRL + C 命令停止一個正在處理消息的工做單元也不會丟失什麼。這樣,在工做單元停止不久,全部未被確認的消息將會被從新遞送。
被遺忘的確認
It's a common mistake to miss the BasicAck. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.
一個較爲常見的錯誤是忽視了 BasicAck,儘管這種錯誤很低級,但後果至關嚴重。當客戶端退出時,消息會被從新遞送(可能看起來像是隨機地從新遞送),可是 RabbitMQ 會由於沒法釋放那些未經確認的消息而吃掉愈來愈多的內存。
In order to debug this kind of mistake you can use rabbitmqctl to print the messages_unacknowledged field:
爲了調試這種類型的錯誤,你可使用 rabbitmqctl 命令來打印 messages_unacknowledged 字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
On Windows, drop the sudo:
在 Windows 平臺上,釋放(執行)該 sudo 命令:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
We have learned how to make sure that even if the consumer dies, the task isn't lost. But our tasks will still be lost if RabbitMQ server stops.
咱們已經學習瞭如何確保即便消費者(意外)停止時,也可讓任務不會丟失。可是在 RabbitMQ 服務端中止時,咱們的任務仍然會丟失。
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.
當 RabbitMQ 退出或者崩潰時它將遺失隊列和消息,除非你明確告知它不要這麼作。作好兩件必要的事情,也就是將隊列和消息標記爲可持久的,這樣就能夠確保消息不會遺失。
First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
首先,咱們須要確保 RabbitMQ 不會丟失隊列,爲了作到這一點,咱們須要聲明它是可持久的:
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
Although this command is correct by itself, it won't work in our present setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for example task_queue:
儘管這個命令的自己是正確的,但在咱們當前的設置中仍不能正常工做,那是由於咱們已經定義過一個未被持久化的名叫「hello」的隊列(參照第一章提到的冪等性)。
RabbitMQ不容許採用不一樣參數從新定義一個已存在的隊列,任何程序試圖這麼作的話將被返回一個錯誤。不過卻是有一個變通方案,讓咱們來聲明一個不一樣名稱的隊列就行了,好比叫「task_queue」:
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
This queueDeclare change needs to be applied to both the producer and consumer code.
該隊列聲明的變動須要同時做用於生產者和消費者兩處的代碼(參考第一章中 Receiving 這一節提到的「嘗試從隊列中消費消息時,確保隊列老是已存在的」,由於沒法保障會先打開哪個終端,因此該隊列聲明的代碼要寫兩處)。
At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting IBasicProperties.SetPersistent to true.
此時此刻,咱們能夠肯定即便 RabbitMQ 重啓了,名爲「task_queue」的隊列也再也不丟失。如今咱們須要經過設置 IBasicProperties.SetPersistent 的值爲 true,從而標識消息爲可持久的。
var properties = channel.CreateBasicProperties(); properties.Persistent = true;
注意消息的持久
Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.
將消息標識爲持久並不能徹底保證一個消息也不會被丟失。儘管該標識告訴 RabbitMQ 要將消息保存到磁盤,可是當 RabbitMQ 已經接到一個消息卻還沒有保存它之際,將仍然有一個很小的時間窗口。另外,極可能這還只是保存到了緩存而未實實在在地寫入到磁盤。儘管該持久保障措施還不是很強,但對於咱們簡單的任務隊列已是綽綽有餘。若是你須要一個更強大的保障,可使用發佈者確認機制。
公平分發
You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.
你可能也注意到了,這種分發模式也不如咱們所指望。好比在有兩個工做單元的情景下,(並有多條消息相繼而來),假設奇數項的消息比較冗繁,而偶數項的消息相對輕巧些。這樣,其中一個工做單元將會持續地繁忙,而另外一個工做單元則幾乎不作任何事情。然而,RabbitMQ 並不知情並且還會繼續朝奇數方向分發消息。
This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.
之因此發生這樣的情形,是由於當消息進入到隊列時 RabbitMQ 就開始分發,而忽視了消費者這邊未確認消息的數量,它只是盲目地向第 n 個消費者分發每一條消息。
In order to change this behavior we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
爲了改變這種行爲,咱們可使用 BasicQos 方法的 prefetchCount = 1 設置。它將告訴 RabbitMQ 向工做單元分發消息時一次不要超過一個。或者換一句話來說,直到一個工做單元已處理完成並確認過上一個消息時,才把消息發送給它。反之,RabbitMQ 會把消息分發給下一個並不繁忙的工做單元。(從而達到公平分發的效果。)
channel.BasicQos(0, 1, false);
注意隊列大小
If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
若是全部的工做單元都很繁忙,你的隊列將會被填滿,這時就須要你密切注視它,也許能夠添加更多的工做單元,或者採起其餘的策略。
融合一塊兒
Final code of our NewTask.cs class:
NewTask.cs 類文件的最終代碼:
using System; using RabbitMQ.Client; using System.Text; class NewTask { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } }
And our Worker.cs:
Worker.cs 類文件:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading; class Worker { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
Using message acknowledgments and BasicQos you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.
使用消息確認和 BasicQos,你能夠創建起一個工做隊列。經過持續化選項,即便 RabbitMQ 重啓也可讓任務繼續存活。