在上篇揭開RabbitMQ的神祕面紗一文中,咱們編寫了程序來發送和接收來自命名隊列的消息。html
本篇咱們將建立一個工做隊列,工做隊列背後的假設是每一個任務都交付給一個工做者java
本篇是譯文,英文原文請移步:http://www.rabbitmq.com/tutorials/tutorial-two-java.html算法
前提:本教程假定RabbitMQ 已在標準端口(15672)上的localhost上安裝並運行。若是您使用不一樣的主機,端口或憑據,則須要調整鏈接設置。數組
工做隊列(又稱:任務隊列)背後的主要思想是避免當即執行資源密集型任務,而且必須等待它完成。緩存
相反,咱們安排任務稍後完成。咱們將任務封裝 爲消息並將其發送到隊列。在後臺運行的工做進程將彈出任務並最終執行做業。當您運行許多工做程序時,它們之間將共享任務。bash
這個概念在Web應用程序中特別有用,由於在短的HTTP請求窗口中沒法處理複雜的任務。服務器
如何理解上面這段話呢?app
咱們能夠舉個例子,假設用戶有多個文件上傳請求,然而Web應用對文件上傳進行處理每每是一件比較耗時的操做,是沒法馬上立刻響應返回給客戶端結果,這時候咱們就須要一個工做隊列來處理。ide
再好比生活中的買票,檢票,你們都知道,當咱們買票檢票,大多須要排隊一個一個處理,二者相似。函數
在上篇中咱們發送了一個Hello World 信息,如今咱們將發送複雜任務的字符串。
可是咱們沒有實際的應用場景,因此咱們這裏暫時使用 Thread.sleep() 函數來模擬PDF 文件上傳實現延遲效果。
咱們建立一個生產者(產生消息,發送消息的一方)發佈新的任務,文件名稱叫作NewTask.java:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /*** * 生產者 * ********/ public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { //建立和消息隊列的鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //第二個參數爲true 確保關閉RabbitMQ服務器時執行持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //從命令行發送任意消息 String message = getMessage(argv); //將消息標記爲持久性 - 經過將MessageProperties(實現BasicProperties)設置爲值PERSISTENT_TEXT_PLAIN。 channel.basicPublish("", TASK_QUEUE_NAME,//指定消息隊列的名稱 MessageProperties.PERSISTENT_TEXT_PLAIN,//指定消息持久化 message.getBytes("UTF-8"));//指定消息的字符編碼 //打印生產者發送成功的消息 System.out.println(" [x] Sent '" + message + "'"); //關閉資源 channel.close(); connection.close(); } /*** * 一些幫助從命令行參數獲取消息 * @param strings 從命令行發送任意消息字符串 * */ private static String getMessage(String[] strings) { if (strings.length < 1) return "Hello World!"; return joinStrings(strings," "); } /** * 字符串數組 * @param delimiter 分隔符 * */ private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
接下來咱們建立咱們的消費者(工做者),它須要爲消息體中的每一個點僞造一秒鐘的工做。它將處理傳遞的消息並執行任務。
Worker.java
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { //和消息隊列建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //指定消息隊列的名稱 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //在處理並確認前一個消息以前,不要向工做人員發送新消息。相反,它會將它發送給下一個仍然不忙的工人 int prefetchCount = 1 ; channel.basicQos(prefetchCount); //channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //boolean autoAck = false; //channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); } /***** * 咱們的假任務是模擬執行時間 * */ private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用任務隊列的一個優勢是可以輕鬆地並行工做。若是咱們正在積壓工做積壓,咱們能夠添加更多工做者,這樣就能夠輕鬆擴展。
首先,讓咱們嘗試同時讓兩個工做者工做。他們都會從隊列中獲取消息,但究竟如何呢?讓咱們來看看。
咱們選中Worker.java 右鍵,Run as -----> Java Application ,執行三次,啓動三個實例。
這樣一來就至關於有了三個工做者(消費者),
而後咱們同理開始嘗試屢次運行生產者,NewTask.java,這樣將產生多個任務
而後咱們能夠清楚在控制檯看到這樣的狀況,
第一個Work.java
第二個Work.java
第三個work.java
Tips: 上面是worker.java 運行了三次,newTask 運行了四次。
也就是說任務有四個,按照循環調度算法,第一個循環到第二個循環,因此有了兩個消息,而其餘消息有了一個消息。
第一個work.java 的控制檯收到了兩條消息後繼續等待收消息
第二個work.java 的控制檯收到了一條消息後也繼續等待接受消息
第三個work.java 的控制檯也收到了 一條消息後繼續等待接受消息
默認狀況下,RabbitMQ將按順序將每條消息發送給下一個消費者。
平均而言,每一個消費者將得到相同數量的消息。這種分發消息的方式稱爲循環法。
Tips: 一個圓圈待表收到的一個消息,一個矩形表明一個Worker 實例
雖然執行任務可能須要幾秒鐘,可是可能咱們會好奇想知道若是其中一個消費者開始執行長任務而且僅在部分完成時死亡會發生什麼。
使用咱們當前的代碼,一旦RabbitMQ向客戶發送消息,它當即將其標記爲刪除。
在這種狀況下,若是你直接關閉一個worker 實例,咱們將丟失它剛剛處理的消息。咱們還將丟失分發給這個特定工做者但還沒有處理的全部消息。
但咱們不想失去任何任務。若是一個worker 工做者實例死亡,咱們但願將任務交付給另外一名工做者Worker。
爲了確保消息永不丟失,RabbitMQ支持 message acknowledgments. (消息確認)。
消費者(工做者)發回一個 ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ能夠自由刪除它。
若是工做者死亡(其通道關閉,鏈接關閉或TCP鏈接丟失)而不發送確認,RabbitMQ將理解消息未徹底處理並將從新排隊。
若是其餘消費者同時在線,則會迅速將其從新發送給其餘消費者。這樣你就能夠確保沒有消息丟失,即便Worker偶爾會死亡。
沒有任何消息超時; 當消費者死亡時,RabbitMQ將從新發送消息。即便處理消息須要很是長的時間,也不要緊。
默認狀況下, Manual message acknowledgments 手動消息已打開。
在前面的示例中,第二個參數咱們經過autoAck = true 標誌明確地將它們關閉。
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Tips: true 即autoAck 的值
一旦咱們完成任務,第二個參數就應該將此標誌設置爲false並從工做人員發送適當的確認。
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
使用此代碼,咱們能夠肯定即便您在處理消息時使用CTRL + C殺死一名Worker 實例,也不會丟失任何內容。
由於Worker死後不久,全部未經確認的消息將被從新傳遞。
確認必須在收到的交付的同一信道上發送。嘗試使用不一樣的通道進行確認將致使通道級協議異常. 詳情看 doc guide on confirmations 瞭解更多。
錯過basicAck是一個常見的錯誤。這是一個簡單的錯誤,但後果是嚴重的。(也就是這句話若是忘了寫,後果很嚴重,該消息將一直髮送不出去)
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
當您的客戶端退出時,消息將被從新傳遞(這可能看起來像隨機從新傳遞),但RabbitMQ將會佔用愈來愈多的內存,由於它沒法釋聽任何未經處理的消息。
爲了調試這種錯誤,您可使用rabbitmqctl 來打印messages_unacknowledged字段
Linux 下:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows 下:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱們已經學會了如何確保即便消費者死亡,任務也不會丟失。可是若是RabbitMQ服務器中止,咱們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非你告訴它不要。確保消息不會丟失須要作兩件事:咱們須要將隊列和消息都標記爲持久。
首先,咱們須要確保RabbitMQ永遠不會丟失咱們的隊列。爲此,咱們須要聲明它是持久的:
boolean durable = true ; channel.queueDeclare(「hello」,durable,false,false,null);
雖然此命令自己是正確的,但它在咱們當前的設置中不起做用。
那是由於咱們已經定義了一個名爲hello的隊列 ,這個隊列不耐用。
RabbitMQ不容許您使用不一樣的參數從新定義現有隊列,並將向嘗試執行此操做的任何程序返回錯誤。
可是有一個快速的解決方法 - 讓咱們聲明一個具備不一樣名稱的隊列,例如task_queue:
boolean durable = true ; channel.queueDeclare(「task_queue」,durable,false,false,null);
此queueDeclare更改須要應用於生產者和消費者代碼。
此時咱們確信即便RabbitMQ從新啓動,task_queue隊列也不會丟失。
如今咱們須要將消息標記爲持久性 - 經過將MessageProperties(實現BasicProperties)設置爲值PERSISTENT_TEXT_PLAIN。
Note on message persistence
將消息標記爲持久性並不能徹底保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,可是當RabbitMQ接受消息而且還沒有保存消息時,仍然有一個短期窗口。此外,RabbitMQ不會爲每條消息執行fsync(2) - 它可能只是保存到緩存而不是真正寫入磁盤。持久性保證不強,但對於咱們簡單的任務隊列來講已經足夠了。若是您須要更強的保證,那麼您可使用 發佈者確認。
您可能已經注意到調度仍然沒法徹底按照咱們的意願運行。
例如,在有兩個工人的狀況下,當全部奇怪的消息都很重,甚至消息很輕時,一個工人將常常忙碌而另外一個工做人員幾乎不會作任何工做。
好吧,RabbitMQ對此一無所知,仍然會均勻地發送消息。
發生這種狀況是由於RabbitMQ只是在消息進入隊列時調度消息。
它不會查看消費者未確認消息的數量。它只是盲目地向第n個消費者發送每一個第n個消息。
爲了戰勝咱們可使用basicQos方法和 prefetchCount = 1設置。這告訴RabbitMQ不要一次向一個worker發送一條消息。或者,換句話說,在處理並確認前一個消息以前,不要向工做人員發送新消息。相反,它會將它發送給下一個仍然不忙的工人。
int prefetchCount = 1 ; channel.basicQos(prefetchCount);
關於隊列大小的說明
若是全部工做人員都很忙,您的隊列就會填滿。您將須要密切關注這一點,並可能添加更多工做人員,或者採起其餘策略。
使用消息確認和prefetchCount,您能夠設置工做隊列。即便RabbitMQ從新啓動,持久性選項也可使任務生效。
本篇完~