原文地址
html
在第一章中,咱們寫了經過一個queue來發送和接收message的簡單程序。在這一章中,咱們會建立一個workqueue,來將執行時間敏感的任務分發到多個worker中。java
work模式主要的意圖是要避免等待完成一個耗時的任務。取而代之地,咱們延遲任務的執行,將任務封裝成消息,將之發送到queue。一個運行着的worker進程會彈出這個任務並執行它。當運行多個worker進程時,任務會在它們之間分派。linux
這種模式在web應用中特別有用,由於在一個較短的HTTP請求窗口中不會去執行一個複雜的任務。web
在上一章中,咱們發送了一個」Hello World!"的message。如今咱們將發送一個表明了複雜任務的字符串。這不是一個實際的任務,好比像調整圖片大小或是從新渲染pdf文檔,咱們通Thead.sleep() 來模擬一個耗時的任務。message中的小圓點表示其複雜度,圓點越多則任務的執行越耗時。好比「Hello..."的message將耗時3秒。windows
咱們簡單的修改上一章的Send.java代碼,容許在命令行發送任意message。新的類叫作NewTask.javabash
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
一樣的,咱們修改上一章中的Recv.java,讓它在處理message的時候根據小圓點進行睡眠。新的類叫Worker.java服務器
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
像在第一章同樣編譯這兩個類併發
javac -cp $CP NewTask.java Worker.java
使用Task模式的一個明顯的優點是讓並行執行任務變得簡單。咱們只須要啓動更多的worker就能夠消減堆積的message,系統水平擴展簡單。學習
首先,咱們在同一時間啓動兩個worker。他們都會從queue得到message,來看一下具體細節。fetch
打開了三個終端,兩個是跑worker的。
java -cp $CP Worker
java -cp $CP Worker
第三個終端裏來發布新的任務message。
java -cp $CP NewTask First message. java -cp $CP NewTask Second message.. java -cp $CP NewTask Third message... java -cp $CP NewTask Fourth message.... java -cp $CP NewTask Fifth message.....
讓咱們看看worker的處理message的狀況.第一個worker收到了第1,3,5message,第二個worker收到了第2,4個message。
默認狀況下,RabbitMQ會順序的將message發給下一個消費者。每一個消費者會獲得平均數量的message。這種方式稱之爲round-robin(輪詢).
執行任務須要必定的時間。你可能會好奇若是一個worker開始執行任務,可是中途異常退出,會是什麼結果。在咱們如今的代碼中,一旦RabbitMQ將消息發送出去了,它會當即將該message刪除。這樣的話,就可能丟失message。
在實際場景中,咱們不想丟失任何一個task。若是一個worker異常中斷了,咱們但願這個task能分派給另外一個worker。
爲了確保不會丟失message,RabbitMQ採用message確認機制。RabbitMQ只有收到該message的Ack以後,纔會刪除該消息。
若是worker中斷退出了( channel關閉了,connection關閉了,或是TCP鏈接丟失了)而沒有發送Ack,RabbitMQ會認爲該消息沒有完整的執行,會將該消息從新入隊。該消息會被髮送給其餘的worker。這樣就不用message丟失,即便是在worker常常異常中斷退出的場景下。
不會有任何message會timeout。當消費者中斷退出,RabbitMQ會從新分派message。即便消息的執行會花費很長的時間。
默認狀況下,message是須要人工確認的。在上面的例子中,咱們經過autoAck=true來關閉了人工確認。像下面這樣,咱們將該標誌設置爲false,worker就須要在完成了任務以後,發送確認。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
上面的代碼保證即便當worker還在處理一條消息,而強制它退出,也不會丟失message。而後不久,全部未被確認的消息都會被從新分派。
發送確認必須和接收相同的channel。使用不一樣的channel進行確認會致使channel-level protocol 異常。
忘記確認消息是一個比較常見的錯誤,可是其後果是很嚴重的。當client退出時,message會被從新分派,可是RabbitMQ會佔用愈來愈多的內存,因它沒法釋放那些未被確認的message。
能夠經過rabbitmqctl來打印messages_unacknowledged:
##linux sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged ##windows rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱們學習了在消費者出現問題的時候不丟失message。可是若是RabbitMQ服務器宕機了,咱們仍是會丟失message。
當RabbitMQ宕機時,默認狀況下,它會」忘記「全部的queue和message。爲了確保message不丟失,咱們須要確認兩件事情:咱們要使得queue和message都是持久的。
首先,咱們要確保RabbitMQ不會丟失咱們設置好的queue。因此,咱們要把它聲明成持久的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
雖然代碼沒有任何問題,可是光這樣是無效的。由於咱們以前已經定義過名字爲hello的queue。RabbitMQ不容許你使用不一樣的參數去從新定義一個已經存在的queue,並且這還不會反悔任何錯誤信息。可是咱們仍是有別的方法,讓咱們使用一個別的名字,好比task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
聲明queue的改變要在生產者和消費者的代碼裏都進行修改。
接着咱們要設置message的持久性,咱們經過設置MessageProperties爲PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
將message標記成持久的不能100%保證message不會丟失,雖然這告訴RabbitMQ將message保存到磁盤,然而在RabbitMQ從接到message到保存之間,仍然有一小段時間。同時RabbitMQ不會給每一條message執行fsync(2) -- 可能只是保存到了cache而沒有寫到磁盤上去。因此持久的保證也不是很是強,而後對咱們簡單的task queue來講則足夠了。若是須要一個很是強的保證,則可使用 發佈確認的方式。
你可能已經注意到分派的工做沒有如咱們所指望的來執行。好比在有2個worker的狀況系,全部偶數的message耗時很長,而全部奇數的message則耗時很短,這樣其中一個worker則一直被分派到偶數的message,而另外一個則一直是奇數的message。RabbitMQ對此並不知曉,進而繼續這樣分派着message。
這樣的緣由是RabbitMQ是在message入queue的時候肯定分派的。它不關心消費者ack的狀況。
咱們能夠經過basicQos方法和prefetchCount(1)來解決這個問題。這個設置是讓RabbitMQ給worker一次一個message。或者這麼說,直到worker處理完以前的message併發送ack,纔給worker下一個message。不然,Rabbitmq會將message發送給其它不忙的worker。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意queue的大小。若是全部的worker都處於忙碌狀態,queue可能會被裝滿。必須監控queue深度,可能要開啓更多的worker,或者採起其餘的措施。
NewTask.java的最終版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
Worker.java的最終版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用message ack和prefetchCount,來設定work queue。持久化選項則在RabbitMQ重啓後能讓任務得以恢復。