若是一個消費者須要處理一個耗時的任務,那麼隊列中其餘的任務將被迫等待這個消費者處理完成,因此爲了不這樣的狀況,能夠創建對個消費者進行工做。java
本例中使用Thread.sleep()方法來僞裝消費者在處理一個耗時的任務。咱們將把字符串中的點的個數做爲其複雜度; 每一個點都將佔「工做」的一秒鐘。例如,由Hello ...描述的假任務 將須要三秒鐘。咱們在啓動這個程序的時候,設置java參數,如 java NewTask hello ...git
定義一個NewTask.java:github
1 package com.rabbitMQ; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 public class NewTask { 8 9 private final static String QUEUE_NAME = "work"; 10 11 public static void main(String[] args) throws Exception { 12 // 建立鏈接工廠 13 ConnectionFactory factory = new ConnectionFactory(); 14 // 設置鏈接rabbitMQ服務器的ip 15 factory.setHost("localhost"); 16 // factory.setPort(5672); 17 // 建立一個鏈接到服務器的連接 18 Connection connection = factory.newConnection(); 19 // 建立鏈接通道 20 Channel channel = connection.createChannel(); 21 28 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 29 30 String message = getMessage(args); 31 37 channel.basicPublish("", "hello", null, message.getBytes()); 38 39 System.out.println(" [x] Sent '" + message + "'"); 40 41 channel.close(); 42 43 connection.close(); 44 } 45 46 private static String getMessage(String[] strings) { 47 if (strings.length < 1) 48 return "Hello World!"; 49 return joinStrings(strings, " "); 50 } 51 52 private static String joinStrings(String[] strings, String delimiter) { 53 int length = strings.length; 54 if (length == 0) 55 return ""; 56 StringBuilder words = new StringBuilder(strings[0]); 57 for (int i = 1; i < length; i++) { 58 words.append(delimiter).append(strings[i]); 59 } 60 return words.toString(); 61 } 62 63 }
定義一個消費工做者Worker.java:shell
1 package com.rabbitMQ; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.Consumer; 10 import com.rabbitmq.client.DefaultConsumer; 11 import com.rabbitmq.client.Envelope; 12 13 /** 14 15 * @author may 16 * 17 */ 18 public class Worker { 19 20 private final static String QUEUE_NAME = "work"; 21 22 public static void main(String[] argv) throws Exception { 23 ConnectionFactory factory = new ConnectionFactory(); 24 factory.setHost("localhost"); 25 Connection connection = factory.newConnection(); 26 Channel channel = connection.createChannel(); 27 /** 28 * queue the name of the queue durable true if we are declaring a 29 * durable queue (the queue will survive a server restart) exclusive 30 * true if we are declaring an exclusive queue (restricted to this 31 * connection) autoDelete true if we are declaring an autodelete queue 32 * (server will delete it when no longer in use) arguments other 33 * properties (construction arguments) for the queue 34 */ 35 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 36 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 37 // 定義一個消費者 38 final Consumer consumer = new DefaultConsumer(channel) { 39 @Override 40 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 41 byte[] body) throws IOException { 42 String message = new String(body, "UTF-8"); 43 44 System.out.println(" [x] Received '" + message + "'"); 45 try { 46 doWork(message); 47 } catch (InterruptedException e) { 48 // TODO Auto-generated catch block 49 e.printStackTrace(); 50 } finally { 51 System.out.println(" [x] Done"); 52 } 53 } 54 }; 55 // 異步 56 /** 57 * queue the name of the queue 隊列名 autoAck true if the server should 58 * consider messages acknowledged once delivered; false if the server 59 * should expect explicit acknowledgements callback an interface to the 60 * consumer object 61 * 能夠經過如下命令去查看隊列中沒有返回ack的消息個數 62 * rabbitmqctl list_queues name messages_ready messages_unacknowledged 63 */ 64 boolean autoAck = true; 65 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 66 67 // rabbitmqctl.bat list_queues 能夠列出當前有多少個隊列 68 } 69 70 private static void doWork(String task) throws InterruptedException { 71 for (char ch : task.toCharArray()) { 72 if (ch == '.') 73 Thread.sleep(1000); 74 } 75 } 76 77 }
第70行的doWork方法對收到的消息字符串進行遍歷,有多少個.就會休眠多少秒。以此來模擬耗時任務。windows
啓動兩個work,而後屢次啓動NewTask,每次發送的字符串消息不一樣api
在Linux環境下bash
export CP=.:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar
# shell 3 java -cp $CP NewTask # => First message. java -cp $CP NewTask # => Second message.. java -cp $CP NewTask # => Third message... java -cp $CP NewTask # => Fourth message.... java -cp $CP NewTask # => Fifth message.....
查看兩個work的輸出狀況服務器
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
若是是在windows環境下,那麼使用如下的命令
set CP=.;amqp-client-4.0.2.jar;slf4j-api-1.7.21.jar;slf4j-simple-1.7.22.jar java -cp %CP% NewTask
....(把$CP改爲%CP%,其餘同樣)app
eclipse環境下右鍵run as 選擇run configurations...eclipse
能夠看出,默認狀況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均每一個消費者將得到相同數量的消息。這種分發消息的方式叫作round-robin。
執行任務可能須要幾秒鐘。你可能會想,若是一個消費者開始一個很是耗時的任務,而且只運行了一部分時間,就被異常終止了,好比down機。上面的代碼,一旦RabbitMQ向客戶發送消息,它當即將這個消息從內存中刪除。在這種狀況下,若是你殺死一個消費者,咱們將丟失正在處理的消息。咱們還會丟失全部發送給該特定消費者但還沒有處理的消息。
可是咱們不想失去任何任務。若是一個消費者終止,咱們但願把這個任務交給另外一個消費者。
爲了確保消息永遠不會丟失,RabbitMQ支持消息確認。從消費者發送一個確認信息(ack)告訴RabbitMQ已經收到,處理了特定的消息,而且RabbitMQ能夠刪除它。
若是消費者死機(其通道關閉,鏈接關閉或TCP鏈接丟失),而不發送確認信息,RabbitMQ將會知道消息未被徹底處理須要從新排隊。若是同時有其餘消費者在線,則會迅速將其從新提供給另外一個消費者。這樣就能夠確保沒有消息丟失。
爲了防止消費者意外終止形成消息的丟失,咱們能夠設置autoAck爲false,禁止自動確認消息,咱們應該在消息處理成功後手動確認消息。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false);//任務處理完成後手動確認消息 } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
使用這個代碼,咱們能夠肯定即便在處理消息時,使用CTRL + C殺死一個消費者,也不會丟失任何東西。消費者被殺死以後不久,全部未確認的消息將被從新發送。
忘記確認
若是忘記手動確認消息,那麼這些消息將被堆積在隊列中,會消耗內存。咱們能夠經過rabbitmqctl list_queues name messages_ready messages_unacknowledged命令來查看有多少消息未被確認的。
第三個數字就表示相應隊列中已被讀取但未被正確處理的消息有多少個。
咱們已經學會了如何確保即便消費者死亡,任務也不會丟失。可是若是RabbitMQ服務器中止,咱們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它會忘記隊列和消息,除非你不告訴它。須要兩件事來確保消息不會丟失:咱們須要將隊列和消息標記爲持久。
首先,咱們須要確保RabbitMQ不會失去咱們的隊列。爲了這樣作,咱們須要將其聲明爲持久的:
boolean durable = true ; channel.queueDeclare(「hello」,durable,false,false,null);
雖然這個命令自己是正確的,可是在咱們目前的設置中是不行的。這是由於咱們已經定義了一個名爲hello的非持久性隊列。RabbitMQ不容許您從新定義具備不一樣參數的現有隊列,並會向嘗試執行此操做的任何程序返回錯誤。可是有一個快速的解決方法 - 讓咱們用不一樣的名稱聲明一個隊列,例如task_queue:
boolean durable = true ; channel.queueDeclare(「task_queue」,durable,false,false,null);
生產者和消費者的queueDeclare都要更改爲持久性隊列。
在這一點上,咱們確信,即便RabbitMQ從新啓動,task_queue隊列也不會丟失。如今咱們須要經過將MessageProperties(實現了BasicProperties)設置PERSISTENT_TEXT_PLAIN來標記咱們的消息是哪一種類型的持久化消息。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish(「」,「task_queue」, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
前面代碼實現的消息隊列是平均地將任務分發給每一個消費者,若是此時有其中一個消費者處理消息很是的耗時,而另外的一個消費者能夠很快地處理完消息,這個時候就出問題了,若是隊列中存在三條消息,rabbitMQ將第一條給了耗時的消費者,把第二條給了不耗時的消費者,最後把第三條給了耗時的消費者,這個時候,耗時的消費者一直在忙碌,而不耗時的消費者沒事幹。
這是由於當消息進入隊列時,RabbitMQ只會盲目地平均分派消息,不會檢查被分派任務的消費者是否已經將消息處理完成。
爲了不這種問題,在消費者的代碼中設置如下代碼。消費者告訴RabbitMQ不要一次性給我多個消息。或者換句話說,在處理並確認前一個消息以前,不要向我發送新消息,你應該將消息發給不忙的其餘消費者。
int prefetchCount = 1 ; channel.basicQos(prefetchCount);
注意隊列大小
若是全部的消費者都忙,隊列會被填滿。這個時候你應該增長新的消費者或者其餘的方式去消耗隊列中的消息。
NewTask.java類的最終代碼:
1 package com.rabbitMQ; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 import com.rabbitmq.client.MessageProperties; 7 8 public class NewTask_fairDispatch { 9 10 private final static String QUEUE_NAME = "task_queue"; 11 12 public static void main(String[] args) throws Exception { 13 // 建立鏈接工廠 14 ConnectionFactory factory = new ConnectionFactory(); 15 // 設置鏈接rabbitMQ服務器的ip 16 factory.setHost("localhost"); 17 // factory.setPort(5672); 18 // 建立一個鏈接到服務器的連接 19 Connection connection = factory.newConnection(); 20 // 建立鏈接通道 21 Channel channel = connection.createChannel(); 22 23 24 boolean durable = true; 25 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); 26 27 String message = getMessage(args); 28 29 //將隊列中的信息定義爲可持久化的純文本 30 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 31 32 System.out.println(" [x] Sent '" + message + "'"); 33 34 channel.close(); 35 36 connection.close(); 37 } 38 39 private static String getMessage(String[] strings) { 40 if (strings.length < 1) 41 return "Hello World!"; 42 return joinStrings(strings, " "); 43 } 44 45 private static String joinStrings(String[] strings, String delimiter) { 46 int length = strings.length; 47 if (length == 0) 48 return ""; 49 StringBuilder words = new StringBuilder(strings[0]); 50 for (int i = 1; i < length; i++) { 51 words.append(delimiter).append(strings[i]); 52 } 53 return words.toString(); 54 } 55 56 }
Worker.java:
1 package com.rabbitMQ; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.Consumer; 10 import com.rabbitmq.client.DefaultConsumer; 11 import com.rabbitmq.client.Envelope; 12 13 /** 14 * @author may 15 * 16 */ 17 public class Worker_fairDispatch { 18 19 private final static String QUEUE_NAME = "hello"; 20 21 public static void main(String[] argv) throws Exception { 22 ConnectionFactory factory = new ConnectionFactory(); 23 factory.setHost("localhost"); 24 Connection connection = factory.newConnection(); 25 Channel channel = connection.createChannel(); 26 int prefetchCount = 1; 27 //服務傳送的最大消息數量,0表示不限制 28 channel.basicQos(prefetchCount); 29 30 boolean durable = true; 31 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); 32 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 33 // 定義一個消費者 34 final Consumer consumer = new DefaultConsumer(channel) { 35 @Override 36 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 37 byte[] body) throws IOException { 38 String message = new String(body, "UTF-8"); 39 40 System.out.println(" [x] Received '" + message + "'"); 41 try { 42 doWork(message); 43 } catch (InterruptedException e) { 44 // TODO Auto-generated catch block 45 e.printStackTrace(); 46 } finally { 47 System.out.println(" [x] Done"); 48 //確認消息,表示任務已經處理完成 49 channel.basicAck(envelope.getDeliveryTag(), false); 50 } 51 } 52 }; 53 54 boolean autoAck = false; 55 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 56 57 } 58 59 private static void doWork(String task) throws InterruptedException { 60 for (char ch : task.toCharArray()) { 61 if (ch == '.') 62 Thread.sleep(10000); 63 } 64 } 65 66 }