1.在上一節中,咱們寫了一個經過命名隊列發送和消費消息的程序,本節中咱們將建立一個工做隊列。用於在多個工做者之間分配耗時的任務.工做隊列又名任務隊列,它的主要思想是避免將任務分配給正在執行復雜任務的消費者.若是已有的消費者正在執行某些複雜而耗時的任務時,那麼新的消息任務將被堆積在隊列中,並等待現有隊列空閒時纔會把消息發送給消費者.
你能夠啓用多個消費者同時來處理複雜任務.這些任務將在多個消費者之間共享.以上來自官網的翻譯,可是我的感受最後一句話換成:這些任務將會平均分配給多個消費者會更適合.後面的例子你會明白爲何爲何我這樣說.html
2.準備工做:
在上一節的例子中,咱們發送了一個包含"hello rabbitmq"的消息.本節咱們的實例將發送表明複雜耗時任務
的字符串消息.由於咱們並無真實的複雜業務場景,因此在此處經過字符串模擬複雜任務.咱們將經過命令行參數發送帶點(.)的字符串,其中一個點表明這個任務將耗時一秒.這個功能咱們經過Thread.sleep()函數來實現。好比咱們經過命令行參數發送消息message...,則表明此任務將會耗時三秒。java
3.修改上一節中的Send.java代碼中發送的消息內容:git
/發送的消息體 String message = getMessage(args); //接受命令行參數 public static String getMessage(String[] args){ if(args.length < 1){ return "Hello, World!"; } return joinStrings(args, " "); } //組裝命令行參數 public static String joinStrings(String[] args, String delimiter){ int length = args.length; if(length == 0){ return ""; } StringBuilder word = new StringBuilder(args[0]); for(int i = 1; i < length; i ++){ word.append(delimiter).append(args[i]); } return word.toString(); }
兩個很簡單的方法,用戶從命令行獲取參數,而後用指定分隔符拼接成字符串.數組
接下來修改Recv.java中代碼:緩存
//使用信道建立消息消費者 final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); }catch (Exception e){ e.printStackTrace(); }finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
public static void doWork(String task) throws InterruptedException{ for(char ch : task.toCharArray()){ if(ch == '.'){ Thread.sleep(1000); } } }
其中doWork方法用來根據.的數量來模擬複雜任務,每一個點程序休眠1秒.服務器
autoAck稍後會說明它的用途。app
此時代碼的修改已完成,爲了和以前的分開,我寫了另外兩個Java文件。tcp
完整的發送和接收代碼以下:ide
NewTask.java(消息生產者)函數
public class NewTask { private static final String QUEUE_NAME = "worker_queue"; public static void main(String[] args) throws Exception{ //建立鏈接工廠,設置鏈接地址,用戶名,密碼 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("admin"); //建立鏈接 Connection connection = factory.newConnection(); //經過鏈接建立信道,信道的建立和銷燬代價比鏈接要小的多 Channel channel = connection.createChannel(); //設置隊列是否持久化,默認設置爲false.不開啓持久化 boolean durable = false; //聲明隊列 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); //發送的消息體 String message = getMessage(args); //發送消息,消息體格式爲字節數組 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //關閉鏈接和通道 channel.close(); connection.close(); } /** * 解析命令行參數 * @param args * @return */ public static String getMessage(String[] args){ if(args.length < 1){ return "Hello, World!"; } return joinStrings(args, " "); } /** * 使用指定分隔符對數組分割 * @param args * @param delimiter * @return 拼接後的字符串 */ public static String joinStrings(String[] args, String delimiter){ int length = args.length; if(length == 0){ return ""; } StringBuilder word = new StringBuilder(args[0]); for(int i = 1; i < length; i ++){ word.append(delimiter).append(args[i]); } return word.toString(); } }
消費者(Worker.java)
public class Worker { private static final String QUEUE_NAME = "worker_queue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = false; //聲明隊列,消息接受者再次聲明隊列.確保消息隊列存在,隊列的建立是冪等的,存在的隊列不會再次建立 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //使用信道建立消息消費者 final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); }catch (Exception e){ e.printStackTrace(); }finally { System.out.println(" [x] Done"); } } }; //設置自動確認爲true.此處的自動確認會致使丟失消息.好比消費者未處理完成而被殺死,連接斷開等等. boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } /** * 判斷消息中.的數量,每一個.休眠一秒,模擬耗時的任務 * @param task * @throws InterruptedException */ public static void doWork(String task) throws InterruptedException{ for(char ch : task.toCharArray()){ if(ch == '.'){ Thread.sleep(1000); } } } }
使用任務隊列的優勢之一即是可以輕鬆建立並行的工做。若是任務過多,咱們只須要增長多個消費者來同時處理更多的任務便可。
下面咱們運行實例,來看看任務隊列的效果:
1.上一節說過,以前的例子中發送方和消費方都作了聲明隊列的操做,
由於沒法肯定先啓動消費者仍是生產者,而此處咱們就會先啓用消費者,再啓動生產者。運行兩次Worker.java中的main方法,啓動兩個消費者
2.NewTask.java啓動前先設置命令行啓動參數,如圖:
Program arguments在第一次運行時設置:First Message.
而後運行NewTask.java以後修改Program arguments,一共發送5次,每次在First Message後追加.
好比第二次發送的命令行參數爲First Message..第三次爲:First Message...依次類推。
而後觀察控制檯輸出結果,以下圖所示:
一共發了5次消息,第一個消費者消費了1.3.5三條消息.(從點的數量便可判斷)
第二個消費者消費了2.4兩條消息。
從結果能夠看出,默認狀況下RabbitMQ採用round-robin(循環)轉發模式
依次將消息平均分發給每個消費者處理。
可是此模式有一個問題:咱們用.來模擬很耗時的任務,一個.耗時1秒,假如我如今
發送三條消息分別是First Message....................此條消息將耗時20秒才能完成,
而後發送第二條First Message...此條消息三秒便可完成。
而後再發送First Message.此條消息1秒便可完成。
按上述流程分發送這幾條消息,你會發現,第一個消費者執行第一條消息耗時20秒.
而第二個消費者執行第二條消息僅僅耗時3秒,
此時再發送第三條僅僅耗時1秒的消息。你會發現,即便第二個消費已經處理完成空閒下來了,
但是RabbitMQ依然將消息發給了第一個消費者,而此時第一個消費者仍然在執行那個20秒的任務
須要等待20秒事後才能執行最後那個僅僅耗時1秒的任務。
在某些場景下,這可能並非咱們須要的結果,咱們但願當多個消費者中某個空閒時就將消息發送給
空閒的消費者處理。
在後邊咱們討論如何實現這種需求。
4.消息確認:
在咱們當前的場景下,模擬複雜任務,執行一個消費者可能會耗時N秒。若是在任務執行期間,
切斷消費者的連接,或者強行終止消費者,會發生什麼?在上邊的例子中,若是咱們執行一個耗時10秒
的任務,在任務執行中強行終止消費者,那麼正在被處理的這條消息將會丟失。
消費者在接收到消息後,隊列中的消息就已經被清空。在真實的業務場景下,這會致使數據的丟失。
爲了不消息的丟失,咱們在消費者回調方法中啓用消息確認機制,通知發送者已經接受併成功處理了消息。只有消費者發送了確認消息,RabbitMQ服務纔會將隊列中的消息清空,這樣就避免了消息丟失的問題。確認機制並無超時的概念,若是消費者的channel斷開,tcp鏈接斷開等,RabbitMQ由於沒有收到消息確認,會將消息緩存在隊列中,發送給下個消費者,若是當前沒有消費者,則一直緩存。直到有消費者來再次消費該消息。
消息確認機制默認啓用,上邊的例子中咱們經過設置boolean autoAck = true關閉了消息確認
解釋:關於消息的自動確認,初看可能有疑惑,設置爲true爲何是關閉了消息確認?我在用代碼作了測試後,說一下本身的理解,其實這裏的消息確認按個人理解指的是生產者只要將消息發送出來,而且消費者收到了,那麼RabbitMQ就默認消息已經成功處理了,作了一次自動確認(清空了隊列中的該條消息)。這是RabbitMQ默認的自動確認機制。這種確認機制的問題就在於:消費者僅僅收到了消息,可是處理是否成功,RabbitMQ卻無論也不問了。若是消費者處理失敗,那麼消息也隨之丟失了。
還有另一種消息確認機制,就是手工確認,在回調中顯示通知RabbitMQ消息處理成功,這種確認機制的好處就是咱們能夠在處理完本身的業務邏輯後顯示通知服務器,業務已經處理完成。此時服務器再刪除隊列中的緩存消息就不會對咱們產生影響了。而將autoAck設置爲true,是打開RabbitMQ自動確認機制,可是卻關閉了顯示通知的機制。因此從這個角度來講,boolean autoAck = true確實是關閉了消息確認,只不過關閉的是在回調中顯示通知的機制。
以上的理解是我的理解,若有誤差還請指正。
下面咱們修改程序讓它支持未確認消息的重發。
要修改的代碼在Worker.java文件中,首先修改回調處理方法
在finally塊中加入以下代碼:
//消息處理完成後程序顯示調用方法來確認給消息發送者. channel.basicAck(envelope.getDeliveryTag(), false);
而後修改boolean autoAck = false;
修改後的完整代碼以下:(此處我新建了一個類,命名爲RequeueWorker.java)
代碼已經有比較詳細的註釋了,再也不解釋代碼
public class RequeueWorker { private static final String QUEUE_NAME = "worker_queue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //聲明隊列,消息接受者再次聲明隊列.確保消息隊列存在,隊列的建立是冪等的,存在的隊列不會再次建立 //開啓消息持久化默認爲true,此處還未用到消息持久化,因此隊列聲明時第二個參數直接設置爲false, //當須要持久化時請打開該行代碼的註釋,並將隊列聲明的第二個參數設置爲durable便可。 //boolean durable = true; channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //使用信道建立消息消費者 /** * 設置通道同時處理的最大的消息數量,此處設置爲1,那麼當該消費者有未完成的任務時,並且沒有其餘空閒消費者 * 那麼消息會堆積在隊列中等待處理 */ channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); }catch (Exception e){ e.printStackTrace(); }finally { System.out.println(" [x] Done"); //消息處理完成後程序顯示調用方法來確認給消息發送者. channel.basicAck(envelope.getDeliveryTag(), false); } } }; /** * 設置自動確認爲false.爲false時rabbitmq不會自動確認消息的消費(在發送方未收到消費者 * 確認以前不會刪除隊列中的消息).以此來保證消息不丟失,即便消費者被殺死或斷開鏈接 * 啓動兩個RequeueWorker,發送一個耗時5秒的任務,在任務執行期間斷開正在執行任務的消費者鏈接 * 會發現正在處理的消息被轉發給另外一個空閒的消費者處理了 */ boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } /** * 判斷消息中.的數量,每一個.休眠一秒,模擬耗時的任務 * @param task * @throws InterruptedException */ public static void doWork(String task) throws InterruptedException{ for(char ch : task.toCharArray()){ if(ch == '.'){ Thread.sleep(1000); } } } }
此時啓動兩個消費者,而後啓動生產者。發送一條耗時時間長的任務消息。好比耗時10秒的任務:First Message..........
而後在控制檯觀察哪一個消費者處理了該消息,在消息未完成處理以前強制終止掉該消費者。此時能夠看到未被成功處理的消息已經被重發給另外一個消費處理了。若是正在處理的消費者被終止,而且沒有其餘消費者,那麼消息將會緩存在隊列中。等待其餘消費者來消費,避免消息丟失。
5.消息的持久化:
到此時咱們已經解決了從應用程序層面解決丟失消息的問題,可是依然有其餘可能會形成消息的丟失。好比,RabbitMQ服務掛掉,RabbitMQ服務所在主機宕機、斷電等都會致使消息丟失,爲了應對這些狀況,咱們須要消息的持久化策略。
修改上邊的RequeueWorker.java中的代碼,打開被註釋的行:boolean durable = true,並將
channel.queueDeclare(QUEUE_NAME, false, false, false, null);的第二個參數設置爲durable。
即:channel.queueDeclare(QUEUE_NAME, durable, false, false, null);修改後的代碼以下:
boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
經過將隊列聲明的方法第二個參數設置爲true來讓消息持久化。
注意:以前咱們聲明的隊列都是非持久化的,即第二個參數爲false,RabbitMQ不容許對已有的隊列使用不一樣參數來再次初始化。因此爲了創建持久化隊列,能夠採用兩種辦法:
1.先刪掉已經存在的隊列,再次建立,建立時將其設置爲持久化隊列。
2.取巧的辦法改一下隊列名,從新運行便可。
好比咱們這裏,修改QUEUE_NAME常量改一個不存在的隊列名並將第二個參數設置爲true就能夠了。記住要同時改生產者和消費者。在修改了隊列聲明以後,還須要作一步:修改生產者發送消息的代碼:將消息設置爲可持久化的,修改以下:
//發送消息,消息體格式爲字節數組 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
修改完成,此時即便重啓RabbitMQ或者服務宕機,也能保證消息不丟失了。
說明:
通常來講作到這步已經夠用了,可是這樣的策略依然沒法徹底保證消息不丟失,好比消息緩存到隊列中,而隊列中的消息還沒來得及持久化到硬盤,忽然斷電了。消息依然會丟,雖然機率很低。若是你對消息丟失的策略要求很高。請自行參考https://www.rabbitmq.com/confirms.html 本文不作過多說明。
6.公平轉發(合理轉發):
在上邊的藍色字體部分,咱們說過,MQ的默認轉發機制有時可能不是咱們想要的。咱們但願有多個消費者時,消息能優先發給空閒的消費者處理,這樣不會形成消息過長的等待和消息的堆積。如今來討論如何實現這樣的轉發策略,咱們將之稱爲公平轉發或者合理轉發。
經過設置channel.basicQos(1),在上邊代碼示例中被註釋掉的那行代碼。能夠設置一個消費者在同一時間點只處理一條消息。換句話說:在一個消費者處理完成消息並經過消息確認通知RabbitMQ它已經處理完上一條消息以前,RabbitMQ不會再次給這個隊列發送消息。
注意:採用此策略時,若是你的全部消費者都有任務在處理中,那麼其餘未處理的消息將會堆積在隊列中。隊列是內存中的緩存,因此隊列消息過多的堆積會致使內存佔用的過大。因此請根據本身的業務場景設置合理的參數以及合理的消費者個數來避免消息的堆積佔用過多內存。
此時再次按照藍色字體部分來發送三條消息,會發現,一個隊列當前有任務在處理時,又有新消息過來,RabbitMQ會把消息發給空閒的消費者去處理,不會堆積在有任務的消費者中等待處理了。
關於本節的完整實例代碼請參考: