工做隊列:把每一個任務只發送給一個工做者。 java
上一篇咱們是從一個指定的隊列發送接收消息,在本文中,咱們將建立一個工做隊列,用於在多個工做者之間分配耗時的任務。服務器
工做隊列(即任務隊列)背後的主要思想是避免當即執行資源密集型的任務,而且必須等待任務完成。相反,咱們把任務安排在之後作。咱們將任務封裝爲消息並將其發送到隊列。在後臺運行的worker進程將彈出任務並最終執行任務。當您運行多個worker時,這些任務將在它們之間共享。app
由於工做隊列,是有多個工人從隊列裏面取得任務,咱們就須要考慮較多的問題,好比說,消息怎麼分發,消息沒有傳遞到位如何,消息傳遞過程當中,鏈接斷開如何,消費會不會重複發給兩個工人處理等等。下面咱們先學習一些跟工做隊列相關的概念:ide
循環調度函數
使用任務隊列的優勢之一是可以輕鬆地並行工做。若是咱們正在創建一個積壓的工做,咱們能夠增長更多的工人,這樣,規模就很容易。學習
RabbitMQ將按順序將每一個消息發送給下一個使用者。平均來講,每一個消費者都會收到相同數量的信息。這種分發消息的方式稱爲循環。fetch
消息確認ui
完成一項任務可能須要幾秒鐘。你可能會想,若是其中一個消費者開始了一項很長的任務,而且只完成了部分任務,會發生什麼。使用咱們當前的代碼,一旦RabbitMQ向客戶傳遞消息,它當即標記爲刪除。在這種狀況下,若是你殺了一個工人,咱們將失去它正在處理的信息。咱們還將丟失已經發送給此特定工做人員但還沒有處理的全部消息。code
但咱們不想失去任何任務。若是一個工做者死了,咱們但願把任務交給另外一個工做者。blog
爲了確保消息不會丟失,RabbitMQ支持消息確認。使用者將一個ack(nowledgement)發回給RabbitMQ,告訴它已經接收、處理了一個特定的消息,而且RabbitMQ能夠自由地刪除它。
若是使用者在沒有發送ack的狀況下死亡(通道關閉、鏈接關閉或TCP鏈接丟失),RabbitMQ將理解消息沒有被完整地處理,並將它從新排隊。若是同時有其餘消費者在線,它將很快從新交付給另外一個消費者。這樣你就能夠確保沒有信息丟失,即便工人偶爾也會死亡。
消息的持久性
咱們已經學會了如何確保即便消費者死亡,任務也不會丟失。可是若是RabbitMQ服務器中止,咱們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非您告訴它不要這樣作。確保消息不丟失須要兩件事情:咱們須要將隊列和消息標記爲持久的。
首先,咱們須要確保RabbitMQ永遠不會丟失隊列。爲了作到這一點,咱們須要聲明它是持久的.
消息的公平分發機制
爲了不當消息分發後,有的工人很是忙,而有的很閒的問題,咱們可使用basicQos方法,並將prefetchCount = 1設置爲。這告訴RabbitMQ不要一次向工做人員發送多個消息。或者,換句話說,在處理並確認以前的消息以前,不要向工做人員發送新的消息。相反,它將把它分派給下一個不太忙的員工。
上代碼:
生產者:
package com.rabbitmq.HelloWorld; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class WorkQueueProduct { private final static String TASK_QUEUE_NAME = "task"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 設置隊列爲可持久性的(生產者和消費者都須要設置)注意:RabbitMQ不容許您從新定義具備不一樣參數的現有隊列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String[] str = {"ans1","b","c","d","e","f","g","h","1","2","3","4","5"}; String message = getMessage(str); // 將消息設置爲持久性,設置消息的其餘屬性爲MessageProperties.PERSISTENT_TEXT_PLAIN,便可 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8")); System.out.println("[x]send:'"+message+"'"); channel.close(); connection.close(); } private static String getMessage(String[] strings) { if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消費者:兩個消費者代碼相同
package com.rabbitmq.HelloWorld; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class WorkQueueConsumer1 { private static final String TASK_QUEUE_NAME = "task"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); Connection connction = factory.newConnection(); Channel channel = connction.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 設置公平分發機制參數,設置爲1後每次只發送一個消息,而且在沒有發送確認消息以前不會再次發送消息 channel.basicQos(1); // 內置回調函數,處理消息 final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body, "utf-8"); System.out.println(" [x] Received '" + message + "'"); try { dowork(message); } catch (Exception e) { // TODO: handle exception }finally{ System.out.println(" [x] Done"); // 任務處理完以後的消息確認 channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE_NAME, false,consumer); } private static void dowork(String task) { // TODO Auto-generated method stub for(char c :task.toCharArray()){ if(c == '.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block Thread.currentThread().interrupt(); } } } } }