原文地址:https://www.rabbitmq.com/tutorials/tutorial-two-java.htmlhtml
在第一篇教程中咱們編寫了發送消息到隊列並從中接收消息的程序。在本篇教程中咱們將會建立一個能夠在多個消費者中分發耗時任務的工做隊列(Work Queue)。java
工做隊列(也叫任務隊列)的主要做用是你不須要當即運行一個耗時的任務並等待它完成。相反,咱們能夠將這個任務延後運行。咱們將一個任務(task)封裝成消息併發送到一個隊列。一個後臺運行的工做進程(worker process)將會取出這個任務並運行它。當你運行了多個工做線程時,任務會在它們之中進行分配。git
若是你須要在一個較短的HTTP請求窗口中處理一個複雜任務,這個概念會顯得尤其重要。github
在上一篇教程中咱們發送了一條包含" Hello World! "的消息。此次咱們將發送表明着複雜任務的字符串。因爲咱們並無諸如調整圖片大小或者渲染PDF文件這些真正耗時的任務,因此咱們使用==Thread.sleep()==來模擬。咱們將字符串中"."的數量做爲它的複雜度,每個點表示一秒鐘的「工做」。好比,一個用==Hello...==描述的任務將會消耗三秒鐘。shell
爲了經過命令行發送任意的消息,咱們將對上一篇教程中的Send.java的代碼進行一些修改。這段程序將會把咱們的任務放入工做隊列,因此咱們叫它==NewTask.java==。api
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
咱們以前的Recv.java代碼也須要進行一些修改:它須要把消息體中的每一個「.」當作一秒鐘的任務耗時。它將會接收消息並運行任務,因此咱們把它叫作==Worker.java==。緩存
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
咱們模擬任務耗時的代碼以下所示:安全
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
使用任務隊列的優點之一就是就能夠將任務並行化。若是咱們的任務沒法及時處理致使積壓,能夠很輕鬆的經過增長更多工做線程來解決。bash
首先,讓咱們同時開啓兩個工做線程。它們將同時從隊列中獲取消息,而後咱們看下具體狀況如何。併發
你須要打開三個命令行窗口(譯者注:你能夠IDE完成下列操做,不須要命令行)。其中兩個用來跑工做線程。這些控制檯稱做咱們的消費者——C1和C2.
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
在第三個窗口中咱們發佈新的任務。一旦你啓動好了消費者,你就能夠開始發佈一些消息。
# shell 3 java -cp $CP NewTask First message. # => [x] Sent 'First message.' java -cp $CP NewTask Second message.. # => [x] Sent 'Second message..' java -cp $CP NewTask Third message... # => [x] Sent 'Third message...' java -cp $CP NewTask Fourth message.... # => [x] Sent 'Fourth message....' java -cp $CP NewTask Fifth message..... # => [x] Sent 'Fifth message.....'
讓咱們看下咱們的消費者收到了什麼東西:
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默認狀況下,RabbitMQ會依次將消息發給每一個消費者。通常狀況下每一個消費者收到的消息數量是相同的。這種分發消息的方式咱們叫作輪詢(round-robin)。你能夠試下開啓三個或者更多工做線程會有什麼結果。
任務的運行可能須要花費一些時間。你可能想要知道當某個消費者運行了一個耗時很長的任務可是中途失敗的時候會發生什麼。在咱們當前的代碼中,一旦RabbitMQ將某個消息發送給了消費者,它會當即將該消息標記爲刪除。在這種狀況下,若是你殺掉了那個進程咱們就會丟失掉它正在處理的消息。一樣的,咱們會丟失全部發送到當前進程但還未處理的消息。
可是咱們並不想損失任何消息。當一個工做進程掛掉以後,咱們但願其餘的工做進程能夠接受這個任務。
爲了保證咱們的每一條消息都不會丟失,RabbitMQ支持消息確認機制。確認消息是消費者用來告訴RabbitMQ某個消息已經被正確收到並處理,能夠隨時被刪除。
若是某個消費者由於掛掉(channel關閉,connection關閉或者TCP鏈接斷開)而沒有發送確認消息,RabbitMQ會認爲這條信息未處理完成並將它從新入隊。若是此時有其餘消費者存活,它會將消息轉發給對應消費者。經過這種方式你能夠保證消息永遠不會丟失,即便消費者會隨機的掛掉。
當前的消息沒有任何超時設置:RabbitMQ在消費者掛掉的時候始終會從新轉發消息。 即便處理一條消息花費很是很是長的時間也不要緊。
手動消息確認默認是開啓狀態。在前一章的例子中咱們經過設置==autoAck=true==手動關閉了它。如今是時候把它設置爲false,並在任務完成後發送合適的確認消息了。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
經過這段代碼咱們能夠保證即便在任務處理消息的時候使用CTRL+C關閉了進程,也不會有任何消息丟失。工做線程掛掉後不久,全部未確認的消息會被從新發送。
確認消息的發送必須與消息的接收位於同一個channel中。嘗試使用一個不一樣的channel發送確認消息將會收到一個channel級別的協議異常。詳情能夠查看該文檔。
忘記確認
忘記調用basicAck進行確認是一種很常見的錯誤。這個錯誤很簡單,可是後果很嚴重。消息會在你的客戶端退出的時候從新發送(看起來像是隨機發送),可是RabbitMQ會由於一直持有這些未回覆的消息而佔用大量的內存。
你可使用==rabbitmqctl==打印==messages_unacknowledged==參數來調試這個錯誤
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在windwos上,使用下列命令
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱們已經學習了當消費者掛掉時如何保證消息不丟失。可是當RabbitMQ服務端掛掉時,消息依然會丟失。
當RabbitMQ退出或崩潰時,默認會丟掉全部的隊列和消息。若是想要保證消息不丟失咱們須要作兩件事:將隊列和消息都設置爲持久化。
首先,咱們須要保證RabbitMQ永遠不會丟掉咱們的隊列。爲了達到這個目的,咱們須要把隊列聲明爲持久化的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
雖然上邊的代碼自己是正確的,可是卻沒法在咱們當前的設置下使用。由於咱們已經定義了名叫hello的非持久化隊列。RabbitMQ不容許使用不一樣的參數從新聲明已經存在的隊列,而且會返回一個錯誤。最簡單的解決方法就是,從新聲明一個名字不一樣的隊列,好比==task_queue==:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
生產者和消費者都須要修改隊列聲明的代碼。
如今咱們能夠保證即便RabbitMQ重啓,==task_queue==隊列也不會丟失。而後咱們須要將咱們的消息也設置爲持久化的——經過設置MessageProperties的PERSISTENT_TEXT_PLAIN參數:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
消息持久化的注意事項
將消息設置爲持久化的並不意味着它必定不會丟失。雖然它告訴RabbitMQ將這條消息保存到磁盤上,可是從接收消息到存到磁盤之間仍有一個短暫的時間窗口。一樣的,RabbitMQ也不會每一個消息都調用==fsync(2)==——此時消息只會被保存在系統緩存而不是真正的寫入磁盤。因此消息的持久化並不是萬無一失,可是對於咱們簡單的任務隊列來講是足夠了。若是你想要一個更加安全的持久化你能夠參考生產者確認
你可能已經注意到消息分發並不像咱們想象中的那樣工做。考慮這樣一個場景,有兩個消費者,RabbitMQ發出的單數消息都很複雜,而雙數的消息都很簡單,這就會致使一個消費者始終處於忙碌狀態,而另外一個消費者幾乎無事可作。然而,RabbitMQ並不知道這種狀況,他依然會均勻的分發消息。
發生這種狀況是由於RabbitMQ在消息進入隊列時才調度消息。 它並不會關注消費者回復確認消息的數量。 它只是盲目地將每第n條消息發送給第n個消費者。
爲了杜絕這種狀況發生,咱們可使用==basicQos==方法設置prefetchCount=1。這會告訴RabbitMQ一次最多隻能給消費者一條消息。或者換句話說,在消費者處理完消息並回復確認以前,否則發送新的消息。這樣的話,RabbitMQ就會將消息分發到另外一個空閒的消費者。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意隊列的容量
若是全部的消費者都處於忙碌狀態,你的隊列可能被填滿。你須要注意這一點,或者增長更多消費者,或者使用其餘策略。
==NewTask.java==
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
==Worker.java==
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
經過消息確認機制和 prefetchCount參數你能夠建立一個工做隊列。而持久化選項使得即便 RabbitMQ重啓,消息依然會存在。
若是你想要知道關於 Channel類和 MessageProperties的更多信息,能夠查閱JavaDocs。