應領導要求 小弟今天也對RabbitMQ的業務邏輯流程簡單的瞭解了一下。從win環境搭建RabbitMQ服務,到代碼測試(單機部署、單生產者、單消費者),一套流程下來,感受都挺正常。但想着好記性不如爛筆頭,因此仍是要儘量的將學到的東西作個筆記。html
單機部署就不介紹了,可參考博客http://www.cnblogs.com/LipeiNet/p/5977028.html,java
這裏我要記錄一下單機的多消費者。網絡
首先是生產者:ide
package mytask; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import com.sun.corba.se.impl.orbutil.threadpool.TimeoutException; import java.io.IOException; /** * @Description:多消費者測試 * @Date: create in 2018-07-13 15:59 * @Author:Reynold-白 */ public class NewTask { private static final String TASK_QUEUE_NAME="task_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory=new ConnectionFactory(); factory.setHost("localhost"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); //分發信息 for (int i=0;i<10;i++){ String message="Hello RabbitMQ"+i; channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("NewTask send '"+message+"'"); } channel.close(); connection.close(); } }
消費者1:測試
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class LoanConsumer { public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); /** * 第一個參數:隊列名字, * 第二個參數:隊列是否可持久化即重啓後該隊列是否依然存在, * 第三個參數:該隊列是否時獨佔的即鏈接上來時它佔用整個網絡鏈接, * 第四個參數:是否自動銷燬即當這個隊列再也不被使用的時候即沒有消費者對接上來時自動刪除, * 第五個參數:其餘參數如TTL(隊列存活時間)等。 */ channel.queueDeclare(LoanQueuesConstant.LOAN_QUEUE, true, false, false, null); System.out.println("LoanConsumer Waiting for messages"); //每次從隊列獲取的數量,其實 是MQ推送給消費者的。 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Loan Received '" + message + "'"); try { // throw new Exception(); doWork(message); }catch (Exception e){ channel.abort(); }finally { System.out.println("Consumer Done"); //前面的參數表示該數據在隊列中的索引位置,第二個參數 channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; //消息消費完成確認 channel.basicConsume(LoanQueuesConstant.LOAN_QUEUE, autoAck, consumer); } private static void doWork(String task) throws Exception{ try { Thread.sleep(1000); // 暫停1秒鐘模擬服務消費時間 AutoIvsConsumer.autoInvest(10); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
消費者2 與消費者1相同。htm
有一個關鍵的配置:blog
boolean autoAck=false; //消息消費完成確認 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
這段表示,當消費者消費了這條信息後,會主動給隊列推送響應。成功後隊列會從列表中吧該條數據清除。表示成功處理。而 autoAck=false 親測,能夠實現當一臺消費者down機後,隊列沒有收到響應,會回收這條數據,並交給其餘消費者處理。索引
運行結果:rabbitmq
結果能夠看到當work1消費02號時異常停機,而work2繼續消費02號信息。隊列