RabbitMQ指南之二:工做隊列(Work Queues)

在上一章的指南中,咱們寫了一個命名隊列:生產者往該命名隊列發送消息、消費從從該命名隊列中消費消息。在本章中,咱們將建立一個工做隊列,用於在多個工做者之間分配耗時的任務。工做隊列(即任務隊列)的主要思想是避免當即執行那些須要等他們執行完成的資源密集型任務。相反,咱們將任務安排在稍後完成。咱們將任務封裝爲消息並將其發送到隊列,後臺運行的工做進程將取出任務並執行完成。若是你啓動了多個工做者,這些任務將在多個工做者之間分享。java

  這個概念也即咱們說的異步,在項目中,有時候一個簡單的Web請求,後臺要作一系統的操做,這時候,若是後臺執行完成以後再給前臺返回消息將會致使瀏覽器頁面等待從而出現假死狀態。所以,一般的作法是,在這個Http請求到後臺,後臺獲取到正確的參數等信息後當即給前臺返回一個成功標誌,而後後臺異步地進行後續的操做。程序員

一、準備

  本章中,咱們將發送字符串消息來模擬複雜的任務。這裏由於沒有一個真實的複雜任務,所以用Thread.sleep()方法來模擬複雜耗時的任務。咱們用字符串中的含點(「.")的數量來表示任務的複雜程度,一個點表示一秒鐘的耗時,例如:一個發送」Hello ...「字符串的任務將會耗時3秒鐘。瀏覽器

  咱們能夠直接將上一章中的Send.java代碼拿過來修改,容許從命令行發送消息。本程序將會把任務調試到工做隊列,所以,咱們將類名改成NewTask.java:緩存

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
複製代碼

  此時完整的NewTask.java代碼爲:bash

1 public class NewTask {
 2 
 3     private final static String QUEUE_NAME = "hello";
 4 
 5     public static void main(String[] argv) throws IOException, TimeoutException {
 6 
 7         ConnectionFactory connectionFactory = new ConnectionFactory();
 8         connectionFactory.setHost("HOST");
 9 
10         try(Connection connection = connectionFactory.newConnection();
11             Channel channel = connection.createChannel()) {
12 
13             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
14 
15             String message = String.join(" ", argv);
16             
17             channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
18             System.out.println(" [x] Sent '" + message + "'");
19         }
20     }
21 }
複製代碼

  以前的Recv.java也要作一些修改:模擬字符串消息中的每一個點耗時1秒鐘,它將處理傳送過來的消息並執行任務,所以,咱們修改成Work.java:服務器

1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 2   String message = new String(delivery.getBody(), "UTF-8");
 3 
 4   System.out.println(" [x] Received '" + message + "'");
 5   try {
 6     doWork(message);
 7   } finally {
 8     System.out.println(" [x] Done");
 9   }
10 };
11 boolean autoAck = true; // acknowledgment is covered below
12 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
複製代碼

  咱們模擬執行過程當中耗時的僞任務:app

1 private static void doWork(String task) throws InterruptedException {
2     for (char ch: task.toCharArray()) {
3         if (ch == '.') Thread.sleep(1000);
4     }
5 }
複製代碼

  此時完整的Work.java爲:異步

1 public class Worker {
 2     private final static String TASK_QUEUE_NAME = "hello";
 3 
 4     public static void main(String[] args) throws Exception {
 5 
 6         ConnectionFactory connectionFactory = new ConnectionFactory();
 7         connectionFactory.setHost("HOST");
 8 
 9         Connection connection = connectionFactory.newConnection();
10         Channel channel = connection.createChannel();
11         channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
12 
13         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
14             String message = new String(delivery.getBody(), "UTF-8");
15 
16             System.out.println(" [x] Received '" + message + "'");
17             try {
18                 doWork(message);
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             } finally {
22                 System.out.println(" [x] Done");
23             }
24         };
25 
26         boolean autoAck = true; // acknowledgment is covered below
27         channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
28     }
29 
30     private static void doWork(String task) throws InterruptedException {
31         for (char ch: task.toCharArray()) {
32             if (ch == '.') Thread.sleep(1000);
33         }
34     }
35 }
複製代碼

二、循環調度

  使用工做隊列的優勢之一是可以輕鬆地進行並行化操做。假設咱們在作一個後臺日誌收集系統,咱們能夠很容易地增長更多的Worker從而提升系統性能。性能

  首先,咱們同時啓動兩個Worker,一樣地,我這裏也放到IDEA中啓動:測試

  

  接下來,咱們前後啓動5個Task,並分別經過main()參數傳入五個字符串消息:

1 First message.
2 Second message..
3 Third message...
4 Fourth message....
5 Fifth message.....
複製代碼

  

  執行五個發送任務以後,來看一下兩個Worker都接收到了什麼樣的消息:

  

  默認狀況下,RabbitMQ將按順序將每一個消息發送給下一個使用者。平均每一個消費者將獲得相同數量的消息。這種消息的調度方式稱之爲循環調度,你能夠開啓更多的Worker來進行測試。

三、消息回執

  由於消費者執行一個任務會有時間耗時,假設一個消費者在執行一個任務執行一半的時候掛掉了將會怎樣?消息會不會所以丟失?在咱們目前的代碼裏,一旦RabbitMq將一條消息轉發給了一個消費者後,將會當即將消息刪除(注意Worker.java裏的autoAck),所以,在咱們上面例子裏,如kill掉一個正在處理數據的Worker,那麼該數據將會丟失。不只如此,全部那些指派給該Worker的還未處理的消息也會丟失。

  但在實際工做的,咱們並不但願一個Worker掛掉以後就會丟失數據,咱們但願的是:若是該Worker掛掉了,全部轉發給該Worker的消息將會從新轉發給其餘Worker進行處理(包括處理了一半的消息)。爲了確保一條消息永不丟失,RabbitMq支持消息回執。消費者在接收到一條消息,而且成功處理完成以後會給RabbitMq回發一條確認ack確認消息,RabbitMq此時纔會刪除該條消息。

  若是一個Worker正在處理一條消息時掛掉了(信道關閉、鏈接關閉、TCP鏈接丟失),它將沒有機會發送ack回執,RabbitMq就認爲該消息沒有消費成功,因而便會將該消息從新放到隊列中,若是此時有其餘消費者仍是在線狀態,RabbitMq會當即將該條消息再轉發給其餘在線的消費者。這種機制能夠保證任何消息都不會丟失。

  默認狀況下,須要手動進行消息確認,在前面的例子裏,咱們經過autoAck=true顯示地關閉了手動消息確認,所以,RabbitMq將採用自動消息確認的機制。如今,咱們修改咱們的程序,採用手動發送回執的方式,當咱們完成對消息的處理後,再手動發送回執確認:

1 channel.basicQos(1); // accept only one unack-ed message at a time (see below)
 2 
 3 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 4   String message = new String(delivery.getBody(), "UTF-8");
 5 
 6   System.out.println(" [x] Received '" + message + "'");
 7   try {
 8     doWork(message);
 9   } finally {
10     System.out.println(" [x] Done");
11     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
12   }
13 };
14 boolean autoAck = false;
15 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
複製代碼

  ack發送信道必須和接收消息的信道(channel)是同一個,若是嘗試經過一個不一樣的信道發送ack回執,將會拋出channel等級協議異常(官網說會拋出異常,可是我在實際測試中並無拋異常,只是該條消息得不到回執,從而也沒法刪除)。

  一個常見的錯誤是忘了手動回執,雖然只是一個簡單的錯誤,可是帶來的後果倒是嚴重的,它將致使已經消費掉的消費不會被刪除,而且當消費該消息的消費者在退出以後,RabbitMq會將該條消息從新進行轉發,內存將被慢慢耗盡。咱們能夠經過正面的命令來檢查這種錯誤:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
複製代碼

  該命令有三列內容,第一列是在監聽的隊列名稱,第二列是Ready狀態的消息數量,第三列是Unacked的消息數量。

四、消息的持久化

  在3中咱們講解了如何保證當消費者掛掉以後消息不被丟失,可是,若是RabbitMq服務或者部署RabbitMq的服務器掛掉了以後,消息仍然會丟失。當RabbitMq崩潰以後,它將會忘記全部的隊列和消息,除非,有什麼機制讓RabbitMq將隊列信息和消息保存下來。

  要確保消息和隊列不會丟失,咱們必需要確保兩件事情。

  首先,咱們要確保RabbitMq永遠不丟失隊列,要作到這點,咱們在定義的時候就須要告訴RabbitMq它是須要持久化的,經過指定durable參數實現:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
複製代碼

  雖然這個命令自己是正確的,可是在咱們目前它不能工做。由於咱們前面已經定義了一個非持久化的hello隊列,RabbitMq不容許從新定義一個已經存在的隊列(用不一樣的參數),不然會拋出異常:

Exception in thread "main" java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
    at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:23)
    Suppressed: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10)
        at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:396)
        at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:292)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:607)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:541)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
        at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:29)
複製代碼

  要麼重啓RabbitMq讓該臨時隊列消失,要麼在控制檯將該隊列刪除,或者從新建立一個新的隊列:

1 boolean durable = true;
2 channel.queueDeclare("task_queue", durable, false, false, null);
複製代碼

  生產者和消費者要作同步修改。

  上面這一步,咱們保證了隊列(task_quee)的持久化,此時,即使RabbitMq崩潰了也不會丟失該隊列,當RabbitMq重啓後將自動從新加載該隊列。

  其次,咱們須要確保咱們的消息也被持久化,要作到這一點,在生產者發佈消息的時候須要指定消息的屬性爲:PERSISTENT_TEXT_PLAIN。

1 import com.rabbitmq.client.MessageProperties;
2 
3 channel.basicPublish("", "task_queue",
4             MessageProperties.PERSISTENT_TEXT_PLAIN,
5             message.getBytes());
複製代碼

  **注意,**即使設置了消息的持久化屬性也不能保證消息會被100%地寫入到磁盤中,由於RabbitMq在接收到消息和寫入到磁盤不是同步的,有可能消息只是被寫入到緩存中而還沒來和及寫入磁盤的時候,RabbitMq崩潰了,此時也會丟失消息。但不管如何,比前面簡單的消息隊列已經強大了不少。

五、公平調度

  您可能已經注意到,任務調度仍然不能徹底按照咱們但願的方式工做。舉個例子,在只有兩個Worker的環境中,奇數的消息比較重,偶數的消息比較輕時,一個Worker將會一直處於忙碌狀態,而另外一個Worker將會一直處於空閒狀態,但RabbitMq並不知道這種狀況,它會依然均衡地向兩個Worker傳遞消息。

  發生這種狀況是由於,當一個消息進入隊列以後,RabbitMq只是盲目地將該第n個消息轉發給第n個消費者,它並不關注每一個消費者發了多少個回執。

  爲了解決這個問題,咱們能夠經過調用basicQos方法,給它傳入1。這將告訴RabbitMq不要同時給一個隊列轉發多於1條的消息,換句話說,在一個消費者沒有完成並回執前一條消息時,不要再給它轉發其餘消息。

1 int prefetchCount = 1;
2 channel.basicQos(prefetchCount);
複製代碼

六、完整的代碼

  1、NewTask.java

1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.MessageProperties;
 5 
 6 public class NewTask {
 7 
 8   private static final String TASK_QUEUE_NAME = "task_queue";
 9 
10   public static void main(String[] argv) throws Exception {
11     ConnectionFactory factory = new ConnectionFactory();
12     factory.setHost("localhost");
13     try (Connection connection = factory.newConnection();
14          Channel channel = connection.createChannel()) {
15         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
16 
17         String message = String.join(" ", argv);
18 
19         channel.basicPublish("", TASK_QUEUE_NAME,
20                 MessageProperties.PERSISTENT_TEXT_PLAIN,
21                 message.getBytes("UTF-8"));
22         System.out.println(" [x] Sent '" + message + "'");
23     }
24   }
25 
26 }
複製代碼

  2、Worker.java

1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.DeliverCallback;
 5 
 6 public class Worker {
 7 
 8   private static final String TASK_QUEUE_NAME = "task_queue";
 9 
10   public static void main(String[] argv) throws Exception {
11     ConnectionFactory factory = new ConnectionFactory();
12     factory.setHost("localhost");
13     final Connection connection = factory.newConnection();
14     final Channel channel = connection.createChannel();
15 
16     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
17     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
18 
19     channel.basicQos(1);
20 
21     DeliverCallback deliverCallback = (consumerTag, delivery) -> {
22         String message = new String(delivery.getBody(), "UTF-8");
23 
24         System.out.println(" [x] Received '" + message + "'");
25         try {
26             doWork(message);
27         } finally {
28             System.out.println(" [x] Done");
29             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
30         }
31     };
32     channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
33   }
34 
35   private static void doWork(String task) {
36     for (char ch : task.toCharArray()) {
37         if (ch == '.') {
38             try {
39                 Thread.sleep(1000);
40             } catch (InterruptedException _ignored) {
41                 Thread.currentThread().interrupt();
42             }
43         }
44     }
45   }
46 }
複製代碼

點關注,不迷路,這是一個程序員都想要關注的公衆號

相關文章
相關標籤/搜索